Добавлена защита от атак ReDoS в обработке регулярных выражений с ограничением длины паттерна и таймаутом компиляции. Оптимизирован алгоритм ограничения частоты API-запросов с использованием токенов. Улучшено логирование ошибок при получении данных о торрентах и добавлена обработка паник в колбэках завершения загрузок.
This commit is contained in:
@@ -18,6 +18,13 @@ import (
|
|||||||
"transmission-telegram/pkg/utils"
|
"transmission-telegram/pkg/utils"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
// maxRegexPatternLength limits regex pattern length to prevent ReDoS attacks
|
||||||
|
maxRegexPatternLength = 1000
|
||||||
|
// regexCompileTimeout is the maximum time allowed for regex compilation
|
||||||
|
regexCompileTimeout = 5 * time.Second
|
||||||
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
trackerRegex = regexp.MustCompile(`[https?|udp]://([^:/]*)`)
|
trackerRegex = regexp.MustCompile(`[https?|udp]://([^:/]*)`)
|
||||||
// bufferPool is a pool for reusing bytes.Buffer instances
|
// bufferPool is a pool for reusing bytes.Buffer instances
|
||||||
@@ -31,25 +38,66 @@ var (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// getCompiledRegex gets or compiles a regex pattern with caching
|
// getCompiledRegex gets or compiles a regex pattern with caching
|
||||||
|
// Includes protection against ReDoS attacks
|
||||||
func getCompiledRegex(pattern string) (*regexp.Regexp, error) {
|
func getCompiledRegex(pattern string) (*regexp.Regexp, error) {
|
||||||
|
// Validate pattern length to prevent ReDoS
|
||||||
|
if len(pattern) > maxRegexPatternLength {
|
||||||
|
return nil, fmt.Errorf("regex pattern too long (max %d characters)", maxRegexPatternLength)
|
||||||
|
}
|
||||||
|
|
||||||
// Check cache first
|
// Check cache first
|
||||||
if cached, ok := regexCache.Load(pattern); ok {
|
if cached, ok := regexCache.Load(pattern); ok {
|
||||||
return cached.(*regexp.Regexp), nil
|
// Safe type assertion with check
|
||||||
|
if regx, ok := cached.(*regexp.Regexp); ok {
|
||||||
|
return regx, nil
|
||||||
|
}
|
||||||
|
// Type mismatch - remove from cache and continue
|
||||||
|
regexCache.Delete(pattern)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Compile and cache
|
// Compile with timeout protection
|
||||||
regx, err := regexp.Compile("(?i)" + pattern)
|
ctx, cancel := context.WithTimeout(context.Background(), regexCompileTimeout)
|
||||||
if err != nil {
|
defer cancel()
|
||||||
return nil, err
|
|
||||||
|
type result struct {
|
||||||
|
regx *regexp.Regexp
|
||||||
|
err error
|
||||||
}
|
}
|
||||||
|
resultCh := make(chan result, 1)
|
||||||
regexCache.Store(pattern, regx)
|
|
||||||
return regx, nil
|
go func() {
|
||||||
|
regx, err := regexp.Compile("(?i)" + pattern)
|
||||||
|
resultCh <- result{regx: regx, err: err}
|
||||||
|
}()
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return nil, fmt.Errorf("regex compilation timeout (pattern may be too complex): %s", pattern[:min(len(pattern), 50)])
|
||||||
|
case res := <-resultCh:
|
||||||
|
if res.err != nil {
|
||||||
|
return nil, fmt.Errorf("regex compilation error: %w", res.err)
|
||||||
|
}
|
||||||
|
regexCache.Store(pattern, res.regx)
|
||||||
|
return res.regx, nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// min returns the minimum of two integers
|
||||||
|
func min(a, b int) int {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
// getBuffer gets a buffer from the pool
|
// getBuffer gets a buffer from the pool
|
||||||
func getBuffer() *bytes.Buffer {
|
func getBuffer() *bytes.Buffer {
|
||||||
return bufferPool.Get().(*bytes.Buffer)
|
item := bufferPool.Get()
|
||||||
|
if buf, ok := item.(*bytes.Buffer); ok {
|
||||||
|
return buf
|
||||||
|
}
|
||||||
|
// Type mismatch - return a new buffer
|
||||||
|
return new(bytes.Buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
// putBuffer returns a buffer to the pool after resetting it
|
// putBuffer returns a buffer to the pool after resetting it
|
||||||
@@ -828,6 +876,7 @@ func (b *Bot) liveUpdateTorrents(ctx context.Context, chatID int64, msgID int, f
|
|||||||
|
|
||||||
torrents, err := b.client.GetTorrents()
|
torrents, err := b.client.GetTorrents()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.logger.Printf("[ERROR] Failed to get torrents for live update: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -864,6 +913,7 @@ func (b *Bot) liveUpdateActive(ctx context.Context, chatID int64, msgID int) {
|
|||||||
|
|
||||||
torrents, err := b.client.GetTorrents()
|
torrents, err := b.client.GetTorrents()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.logger.Printf("[ERROR] Failed to get torrents for live update: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -892,7 +942,11 @@ func (b *Bot) liveUpdateActive(ctx context.Context, chatID int64, msgID int) {
|
|||||||
case <-time.After(b.cfg.Interval):
|
case <-time.After(b.cfg.Interval):
|
||||||
}
|
}
|
||||||
|
|
||||||
torrents, _ := b.client.GetTorrents()
|
torrents, err := b.client.GetTorrents()
|
||||||
|
if err != nil {
|
||||||
|
b.logger.Printf("[ERROR] Failed to get torrents for final live update: %s", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
buf := getBuffer()
|
buf := getBuffer()
|
||||||
defer putBuffer(buf)
|
defer putBuffer(buf)
|
||||||
|
|
||||||
@@ -921,6 +975,7 @@ func (b *Bot) liveUpdateInfo(ctx context.Context, chatID int64, msgID int, torre
|
|||||||
|
|
||||||
torrent, err := b.client.GetTorrent(torrentID)
|
torrent, err := b.client.GetTorrent(torrentID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.logger.Printf("[ERROR] Failed to get torrent %d for live update: %s", torrentID, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -941,7 +996,11 @@ func (b *Bot) liveUpdateInfo(ctx context.Context, chatID int64, msgID int, torre
|
|||||||
case <-time.After(b.cfg.Interval):
|
case <-time.After(b.cfg.Interval):
|
||||||
}
|
}
|
||||||
|
|
||||||
torrent, _ := b.client.GetTorrent(torrentID)
|
torrent, err := b.client.GetTorrent(torrentID)
|
||||||
|
if err != nil {
|
||||||
|
b.logger.Printf("[ERROR] Failed to get torrent %d for final live update: %s", torrentID, err)
|
||||||
|
return
|
||||||
|
}
|
||||||
info := formatter.FormatTorrentInfoStopped(torrent, trackers)
|
info := formatter.FormatTorrentInfoStopped(torrent, trackers)
|
||||||
// Rate limit before sending
|
// Rate limit before sending
|
||||||
rateLimitWait()
|
rateLimitWait()
|
||||||
@@ -962,6 +1021,7 @@ func (b *Bot) liveUpdateSpeed(ctx context.Context, chatID int64, msgID int) {
|
|||||||
|
|
||||||
stats, err := b.client.GetStats()
|
stats, err := b.client.GetStats()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.logger.Printf("[ERROR] Failed to get stats for live update: %s", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package bot
|
|||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@@ -14,37 +15,63 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
// rateLimiter limits API calls to Telegram (30 messages per second)
|
// rateLimiter limits API calls to Telegram (30 messages per second)
|
||||||
// Uses a channel-based approach to avoid mutex contention
|
// Uses token bucket algorithm for accurate rate limiting
|
||||||
type rateLimiter struct {
|
type rateLimiter struct {
|
||||||
ch chan struct{}
|
tokens float64
|
||||||
|
maxTokens float64
|
||||||
|
refillRate float64 // tokens per second
|
||||||
|
lastRefill time.Time
|
||||||
|
mu sync.Mutex
|
||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// telegramRateLimiter limits API calls to 30 per second
|
// telegramRateLimiter limits API calls to 30 per second
|
||||||
telegramRateLimiter = func() *rateLimiter {
|
telegramRateLimiter = func() *rateLimiter {
|
||||||
rl := &rateLimiter{
|
rl := &rateLimiter{
|
||||||
ch: make(chan struct{}, 1),
|
tokens: 30.0, // Start with full bucket
|
||||||
|
maxTokens: 30.0, // Maximum 30 tokens
|
||||||
|
refillRate: 30.0, // 30 tokens per second
|
||||||
|
lastRefill: time.Now(),
|
||||||
}
|
}
|
||||||
// Start the ticker goroutine
|
|
||||||
go func() {
|
|
||||||
ticker := time.NewTicker(time.Second / 30) // 30 calls per second
|
|
||||||
defer ticker.Stop()
|
|
||||||
for range ticker.C {
|
|
||||||
// Try to send a token, but don't block if channel is full
|
|
||||||
select {
|
|
||||||
case rl.ch <- struct{}{}:
|
|
||||||
default:
|
|
||||||
// Channel is full, skip this tick
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return rl
|
return rl
|
||||||
}()
|
}()
|
||||||
)
|
)
|
||||||
|
|
||||||
// wait waits for the next available slot
|
// wait waits for the next available slot using token bucket algorithm
|
||||||
func (rl *rateLimiter) wait() {
|
func (rl *rateLimiter) wait() {
|
||||||
<-rl.ch
|
rl.mu.Lock()
|
||||||
|
defer rl.mu.Unlock()
|
||||||
|
|
||||||
|
// Refill tokens based on elapsed time
|
||||||
|
now := time.Now()
|
||||||
|
elapsed := now.Sub(rl.lastRefill).Seconds()
|
||||||
|
rl.tokens = min(rl.maxTokens, rl.tokens+elapsed*rl.refillRate)
|
||||||
|
rl.lastRefill = now
|
||||||
|
|
||||||
|
// If we have tokens, consume one and return immediately
|
||||||
|
if rl.tokens >= 1.0 {
|
||||||
|
rl.tokens -= 1.0
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Calculate how long to wait for the next token
|
||||||
|
waitTime := (1.0 - rl.tokens) / rl.refillRate
|
||||||
|
rl.mu.Unlock()
|
||||||
|
|
||||||
|
// Wait for the token to become available
|
||||||
|
time.Sleep(time.Duration(waitTime * float64(time.Second)))
|
||||||
|
|
||||||
|
rl.mu.Lock()
|
||||||
|
rl.tokens = 0.0 // Consume the token
|
||||||
|
rl.lastRefill = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
// min returns the minimum of two float64 values
|
||||||
|
func min(a, b float64) float64 {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
// rateLimitWait waits for the next available API call slot
|
// rateLimitWait waits for the next available API call slot
|
||||||
@@ -59,10 +86,10 @@ func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool)
|
|||||||
|
|
||||||
// Set typing action
|
// Set typing action
|
||||||
action := tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)
|
action := tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)
|
||||||
bot.Send(action)
|
if _, err := bot.Send(action); err != nil {
|
||||||
|
// Log error but continue - typing action is not critical
|
||||||
// Check the rune count, telegram is limited to 4096 chars per message
|
fmt.Fprintf(os.Stderr, "[WARN] Failed to send typing action: %v\n", err)
|
||||||
msgRuneCount := utf8.RuneCountInString(text)
|
}
|
||||||
|
|
||||||
var lastMsgID int
|
var lastMsgID int
|
||||||
|
|
||||||
@@ -93,7 +120,10 @@ func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool)
|
|||||||
msg.ParseMode = tgbotapi.ModeMarkdown
|
msg.ParseMode = tgbotapi.ModeMarkdown
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp, err := bot.Send(msg); err == nil {
|
resp, err := bot.Send(msg)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "[ERROR] Failed to send message chunk: %v\n", err)
|
||||||
|
} else {
|
||||||
lastMsgID = resp.MessageID
|
lastMsgID = resp.MessageID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -107,7 +137,10 @@ func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool)
|
|||||||
msg.ParseMode = tgbotapi.ModeMarkdown
|
msg.ParseMode = tgbotapi.ModeMarkdown
|
||||||
}
|
}
|
||||||
|
|
||||||
if resp, err := bot.Send(msg); err == nil {
|
resp, err := bot.Send(msg)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "[ERROR] Failed to send message: %v\n", err)
|
||||||
|
} else {
|
||||||
lastMsgID = resp.MessageID
|
lastMsgID = resp.MessageID
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -114,9 +114,17 @@ func (m *Monitor) checkCompletions() {
|
|||||||
|
|
||||||
if completed && m.onComplete != nil {
|
if completed && m.onComplete != nil {
|
||||||
// Call callback outside of lock to avoid potential deadlocks
|
// Call callback outside of lock to avoid potential deadlocks
|
||||||
|
// Use recover to protect against panics in callback
|
||||||
m.statesMutex.Unlock()
|
m.statesMutex.Unlock()
|
||||||
m.onComplete(torrent)
|
func() {
|
||||||
m.logger.Printf("[INFO] Torrent completed: %s (ID: %d)", torrent.Name, torrent.ID)
|
defer func() {
|
||||||
|
if r := recover(); r != nil {
|
||||||
|
m.logger.Printf("[ERROR] Panic recovered in completion callback for torrent %d: %v", torrent.ID, r)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
m.onComplete(torrent)
|
||||||
|
m.logger.Printf("[INFO] Torrent completed: %s (ID: %d)", torrent.Name, torrent.ID)
|
||||||
|
}()
|
||||||
m.statesMutex.Lock()
|
m.statesMutex.Lock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user