Рефакторинг Telegram-бота для внутреннего управления chat ID и улучшения обработки сообщений. Добавлен метод уведомления о завершении и реализована защита от превышения лимитов API-запросов. Улучшена обработка регулярных выражений с кэшированием и оптимизировано использование буфера для форматирования сообщений.

This commit is contained in:
Struchkov Mark
2025-12-04 21:10:12 +03:00
parent 1d6090e0f2
commit 07576cd523
7 changed files with 229 additions and 64 deletions

View File

@@ -75,20 +75,14 @@ func main() {
// Initialize monitor
mon := monitor.NewMonitor(cachedClient, log, config.DefaultPollInterval)
// chatID will be set when user sends first message
var chatID int64
// Create bot instance first
telegramBot := bot.NewBot(bot, cachedClient, cfg, log, updates)
telegramBot := bot.NewBot(bot, cachedClient, cfg, log, updates, mon)
// Set up completion callback - chatID will be set by bot when user sends first message
var chatID int64
mon.SetOnComplete(func(torrent *transmission.Torrent) {
msg := fmt.Sprintf("✅ Completed: %s", torrent.Name)
if chatID != 0 {
// Send via bot helper
telegramBot.SendMessage(chatID, msg, false)
}
// Send via bot helper - bot will handle chatID internally
telegramBot.SendCompletionNotification(msg)
})
// Start monitoring

View File

@@ -3,10 +3,12 @@ package bot
import (
"context"
"strings"
"sync"
tgbotapi "gopkg.in/telegram-bot-api.v4"
"transmission-telegram/internal/config"
"transmission-telegram/internal/logger"
"transmission-telegram/internal/monitor"
transmissionClient "transmission-telegram/internal/transmission"
)
@@ -17,16 +19,20 @@ type Bot struct {
cfg *config.Config
logger *logger.Logger
updates <-chan tgbotapi.Update
monitor *monitor.Monitor
chatID int64
chatMu sync.RWMutex
}
// NewBot creates a new bot instance
func NewBot(api *tgbotapi.BotAPI, client transmissionClient.Client, cfg *config.Config, log *logger.Logger, updates <-chan tgbotapi.Update) *Bot {
func NewBot(api *tgbotapi.BotAPI, client transmissionClient.Client, cfg *config.Config, log *logger.Logger, updates <-chan tgbotapi.Update, mon *monitor.Monitor) *Bot {
return &Bot{
api: api,
client: client,
cfg: cfg,
logger: log,
updates: updates,
monitor: mon,
}
}
@@ -47,6 +53,27 @@ func (b *Bot) SendMessage(chatID int64, text string, markdown bool) int {
return sendMessage(b.api, chatID, text, markdown)
}
// SendCompletionNotification sends a completion notification to the registered chat
func (b *Bot) SendCompletionNotification(msg string) {
b.chatMu.RLock()
chatID := b.chatID
b.chatMu.RUnlock()
if chatID != 0 {
b.SendMessage(chatID, msg, false)
}
}
// safeHandler wraps a handler function with panic recovery
func (b *Bot) safeHandler(handler func()) {
defer func() {
if r := recover(); r != nil {
b.logger.Printf("[ERROR] Panic recovered in handler: %v", r)
}
}()
handler()
}
// handleUpdate processes a Telegram update
func (b *Bot) handleUpdate(update tgbotapi.Update) {
if update.Message == nil {
@@ -59,6 +86,21 @@ func (b *Bot) handleUpdate(update tgbotapi.Update) {
return
}
// Update chatID for completion notifications
b.chatMu.Lock()
if b.chatID != update.Message.Chat.ID {
b.chatID = update.Message.Chat.ID
if b.monitor != nil {
b.monitor.SetChatID(update.Message.Chat.ID)
}
}
b.chatMu.Unlock()
// Validate message text
if update.Message.Text == "" {
return
}
// Tokenize the update
tokens := strings.Split(update.Message.Text, " ")
@@ -69,70 +111,70 @@ func (b *Bot) handleUpdate(update tgbotapi.Update) {
command := strings.ToLower(tokens[0])
// Route to appropriate handler
// Route to appropriate handler with panic recovery
switch command {
case "list", "/list", "li", "/li", "/ls", "ls":
go b.handleList(update, tokens[1:])
go b.safeHandler(func() { b.handleList(update, tokens[1:]) })
case "head", "/head", "he", "/he":
go b.handleHead(update, tokens[1:])
go b.safeHandler(func() { b.handleHead(update, tokens[1:]) })
case "tail", "/tail", "ta", "/ta":
go b.handleTail(update, tokens[1:])
go b.safeHandler(func() { b.handleTail(update, tokens[1:]) })
case "downs", "/downs", "dg", "/dg":
go b.handleDowns(update)
go b.safeHandler(func() { b.handleDowns(update) })
case "seeding", "/seeding", "sd", "/sd":
go b.handleSeeding(update)
go b.safeHandler(func() { b.handleSeeding(update) })
case "paused", "/paused", "pa", "/pa":
go b.handlePaused(update)
go b.safeHandler(func() { b.handlePaused(update) })
case "checking", "/checking", "ch", "/ch":
go b.handleChecking(update)
go b.safeHandler(func() { b.handleChecking(update) })
case "active", "/active", "ac", "/ac":
go b.handleActive(update)
go b.safeHandler(func() { b.handleActive(update) })
case "errors", "/errors", "er", "/er":
go b.handleErrors(update)
go b.safeHandler(func() { b.handleErrors(update) })
case "sort", "/sort", "so", "/so":
go b.handleSort(update, tokens[1:])
go b.safeHandler(func() { b.handleSort(update, tokens[1:]) })
case "trackers", "/trackers", "tr", "/tr":
go b.handleTrackers(update)
go b.safeHandler(func() { b.handleTrackers(update) })
case "downloaddir", "dd":
go b.handleDownloadDir(update, tokens[1:])
go b.safeHandler(func() { b.handleDownloadDir(update, tokens[1:]) })
case "add", "/add", "ad", "/ad":
go b.handleAdd(update, tokens[1:])
go b.safeHandler(func() { b.handleAdd(update, tokens[1:]) })
case "search", "/search", "se", "/se":
go b.handleSearch(update, tokens[1:])
go b.safeHandler(func() { b.handleSearch(update, tokens[1:]) })
case "latest", "/latest", "la", "/la":
go b.handleLatest(update, tokens[1:])
go b.safeHandler(func() { b.handleLatest(update, tokens[1:]) })
case "info", "/info", "in", "/in":
go b.handleInfo(update, tokens[1:])
go b.safeHandler(func() { b.handleInfo(update, tokens[1:]) })
case "stop", "/stop", "sp", "/sp":
go b.handleStop(update, tokens[1:])
go b.safeHandler(func() { b.handleStop(update, tokens[1:]) })
case "start", "/start", "st", "/st":
go b.handleStart(update, tokens[1:])
go b.safeHandler(func() { b.handleStart(update, tokens[1:]) })
case "check", "/check", "ck", "/ck":
go b.handleCheck(update, tokens[1:])
go b.safeHandler(func() { b.handleCheck(update, tokens[1:]) })
case "stats", "/stats", "sa", "/sa":
go b.handleStats(update)
go b.safeHandler(func() { b.handleStats(update) })
case "downlimit", "dl":
go b.handleDownLimit(update, tokens[1:])
go b.safeHandler(func() { b.handleDownLimit(update, tokens[1:]) })
case "uplimit", "ul":
go b.handleUpLimit(update, tokens[1:])
go b.safeHandler(func() { b.handleUpLimit(update, tokens[1:]) })
case "speed", "/speed", "ss", "/ss":
go b.handleSpeed(update)
go b.safeHandler(func() { b.handleSpeed(update) })
case "count", "/count", "co", "/co":
go b.handleCount(update)
go b.safeHandler(func() { b.handleCount(update) })
case "del", "/del", "rm", "/rm":
go b.handleDel(update, tokens[1:])
go b.safeHandler(func() { b.handleDel(update, tokens[1:]) })
case "deldata", "/deldata":
go b.handleDelData(update, tokens[1:])
go b.safeHandler(func() { b.handleDelData(update, tokens[1:]) })
case "help", "/help":
go b.sendMessage(update.Message.Chat.ID, HelpText, true)
go b.safeHandler(func() { b.sendMessage(update.Message.Chat.ID, HelpText, true) })
case "version", "/version", "ver", "/ver":
go b.handleVersion(update)
go b.safeHandler(func() { b.handleVersion(update) })
case "":
// might be a file received
go b.handleReceiveTorrent(update)
go b.safeHandler(func() { b.handleReceiveTorrent(update) })
default:
// no such command
go b.sendMessage(update.Message.Chat.ID, "No such command, try /help", false)
go b.safeHandler(func() { b.sendMessage(update.Message.Chat.ID, "No such command, try /help", false) })
}
}

View File

@@ -6,6 +6,7 @@ import (
"regexp"
"strconv"
"strings"
"sync"
"time"
"github.com/dustin/go-humanize"
@@ -16,7 +17,48 @@ import (
"transmission-telegram/pkg/utils"
)
var trackerRegex = regexp.MustCompile(`[https?|udp]://([^:/]*)`)
var (
trackerRegex = regexp.MustCompile(`[https?|udp]://([^:/]*)`)
// bufferPool is a pool for reusing bytes.Buffer instances
bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
}
// regexCache caches compiled regular expressions
regexCache = sync.Map{}
)
// getCompiledRegex gets or compiles a regex pattern with caching
func getCompiledRegex(pattern string) (*regexp.Regexp, error) {
// Check cache first
if cached, ok := regexCache.Load(pattern); ok {
return cached.(*regexp.Regexp), nil
}
// Compile and cache
regx, err := regexp.Compile("(?i)" + pattern)
if err != nil {
return nil, err
}
regexCache.Store(pattern, regx)
return regx, nil
}
// getBuffer gets a buffer from the pool
func getBuffer() *bytes.Buffer {
return bufferPool.Get().(*bytes.Buffer)
}
// putBuffer returns a buffer to the pool after resetting it
func putBuffer(buf *bytes.Buffer) {
buf.Reset()
bufferPool.Put(buf)
}
// rateLimitWait waits for the next available API call slot
// This function is defined in helpers.go to access the rate limiter
// Handler methods for bot commands - these are placeholders that need full implementation
// For now, they provide basic functionality
@@ -30,7 +72,7 @@ func (b *Bot) handleList(update tgbotapi.Update, tokens []string) {
buf := new(bytes.Buffer)
if len(tokens) != 0 {
regx, err := regexp.Compile("(?i)" + tokens[0])
regx, err := getCompiledRegex(tokens[0])
if err != nil {
b.sendMessage(update.Message.Chat.ID, "*list:* "+err.Error(), false)
return
@@ -395,7 +437,7 @@ func (b *Bot) handleSearch(update tgbotapi.Update, tokens []string) {
}
query := strings.Join(tokens, " ")
regx, err := regexp.Compile("(?i)" + query)
regx, err := getCompiledRegex(query)
if err != nil {
b.sendMessage(update.Message.Chat.ID, "*search:* "+err.Error(), false)
return
@@ -795,14 +837,21 @@ func (b *Bot) liveUpdateTorrents(chatID int64, msgID int, filter func(transmissi
}
filtered := filter(torrents)
buf := new(bytes.Buffer)
buf := getBuffer()
defer putBuffer(buf)
for i := range filtered {
buf.WriteString(formatter(&filtered[i]))
}
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, buf.String())
editConf.ParseMode = tgbotapi.ModeMarkdown
b.api.Send(editConf)
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
return
}
}
}
@@ -814,30 +863,43 @@ func (b *Bot) liveUpdateActive(chatID int64, msgID int) {
continue
}
buf := new(bytes.Buffer)
buf := getBuffer()
for i := range torrents {
if torrents[i].RateDownload > 0 || torrents[i].RateUpload > 0 {
buf.WriteString(formatter.FormatTorrentDetailed(&torrents[i]))
}
}
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, buf.String())
editConf.ParseMode = tgbotapi.ModeMarkdown
b.api.Send(editConf)
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
putBuffer(buf)
return
}
putBuffer(buf)
}
time.Sleep(b.cfg.Interval)
torrents, _ := b.client.GetTorrents()
buf := new(bytes.Buffer)
buf := getBuffer()
defer putBuffer(buf)
for i := range torrents {
if torrents[i].RateDownload > 0 || torrents[i].RateUpload > 0 {
buf.WriteString(formatter.FormatTorrentActiveStopped(&torrents[i]))
}
}
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, buf.String())
editConf.ParseMode = tgbotapi.ModeMarkdown
b.api.Send(editConf)
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
}
}
func (b *Bot) liveUpdateInfo(chatID int64, msgID int, torrentID int, trackers string) {
@@ -849,17 +911,26 @@ func (b *Bot) liveUpdateInfo(chatID int64, msgID int, torrentID int, trackers st
}
info := formatter.FormatTorrentInfo(torrent, trackers)
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, info)
editConf.ParseMode = tgbotapi.ModeMarkdown
b.api.Send(editConf)
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
return
}
}
time.Sleep(b.cfg.Interval)
torrent, _ := b.client.GetTorrent(torrentID)
info := formatter.FormatTorrentInfoStopped(torrent, trackers)
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, info)
editConf.ParseMode = tgbotapi.ModeMarkdown
b.api.Send(editConf)
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
}
}
func (b *Bot) liveUpdateSpeed(chatID int64, msgID int) {
@@ -871,13 +942,22 @@ func (b *Bot) liveUpdateSpeed(chatID int64, msgID int) {
}
msg := fmt.Sprintf("↓ %s ↑ %s", humanize.Bytes(stats.DownloadSpeed), humanize.Bytes(stats.UploadSpeed))
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, msg)
b.api.Send(editConf)
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
return
}
time.Sleep(b.cfg.Interval)
}
time.Sleep(b.cfg.Interval)
editConf := tgbotapi.NewEditMessageText(chatID, msgID, "↓ - B ↑ - B")
b.api.Send(editConf)
// Rate limit before sending
rateLimitWait()
editConf := tgbotapi.NewEditMessageText(chatID, msgID, "↓ - ↑ -")
if _, err := b.api.Send(editConf); err != nil {
b.logger.Printf("[ERROR] Failed to update message: %s", err)
}
}

View File

@@ -4,6 +4,8 @@ import (
"bytes"
"fmt"
"strconv"
"sync"
"time"
"unicode/utf8"
"github.com/pyed/transmission"
@@ -11,8 +13,36 @@ import (
"transmission-telegram/internal/config"
)
// rateLimiter limits API calls to Telegram (30 messages per second)
type rateLimiter struct {
ticker *time.Ticker
mu sync.Mutex
}
var (
// telegramRateLimiter limits API calls to 30 per second
telegramRateLimiter = &rateLimiter{
ticker: time.NewTicker(time.Second / 30), // 30 calls per second
}
)
// wait waits for the next available slot
func (rl *rateLimiter) wait() {
rl.mu.Lock()
defer rl.mu.Unlock()
<-rl.ticker.C
}
// rateLimitWait waits for the next available API call slot
func rateLimitWait() {
telegramRateLimiter.wait()
}
// sendMessage sends a message to Telegram, splitting if necessary
func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool) int {
// Rate limit: wait before sending
telegramRateLimiter.wait()
// Set typing action
action := tgbotapi.NewChatAction(chatID, tgbotapi.ChatTyping)
bot.Send(action)
@@ -22,12 +52,15 @@ func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool)
var lastMsgID int
// Convert to runes for proper UTF-8 handling
runes := []rune(text)
// Split message if too long
for msgRuneCount > config.TelegramMaxMessageLength {
for len(runes) > config.TelegramMaxMessageLength {
stop := config.TelegramMaxMessageLength - 1
// Find the last newline before the limit
for stop > 0 && text[stop] != '\n' {
for stop > 0 && runes[stop] != '\n' {
stop--
}
@@ -36,10 +69,10 @@ func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool)
stop = config.TelegramMaxMessageLength - 1
}
chunk := text[:stop]
text = text[stop:]
msgRuneCount = utf8.RuneCountInString(text)
chunk := string(runes[:stop+1])
runes = runes[stop+1:]
telegramRateLimiter.wait()
msg := tgbotapi.NewMessage(chatID, chunk)
msg.DisableWebPagePreview = true
if markdown {
@@ -52,8 +85,9 @@ func sendMessage(bot *tgbotapi.BotAPI, chatID int64, text string, markdown bool)
}
// Send remaining text
if len(text) > 0 {
msg := tgbotapi.NewMessage(chatID, text)
if len(runes) > 0 {
telegramRateLimiter.wait()
msg := tgbotapi.NewMessage(chatID, string(runes))
msg.DisableWebPagePreview = true
if markdown {
msg.ParseMode = tgbotapi.ModeMarkdown

View File

@@ -66,7 +66,7 @@ func FormatTorrentInfo(torrent *transmission.Torrent, trackers string) string {
// FormatTorrentInfoStopped formats torrent info when live updates are stopped
func FormatTorrentInfoStopped(torrent *transmission.Torrent, trackers string) string {
torrentName := utils.EscapeMarkdown(torrent.Name)
return fmt.Sprintf("`<%d>` *%s*\n%s *%s* of *%s* (*%.1f%%*) ↓ *- B* ↑ *- B* R: *%s*\nDL: *%s* UP: *%s*\nAdded: *%s*, ETA: *-*\nTrackers: `%s`",
return fmt.Sprintf("`<%d>` *%s*\n%s *%s* of *%s* (*%.1f%%*) ↓ *-* ↑ *-* R: *%s*\nDL: *%s* UP: *%s*\nAdded: *%s*, ETA: *-*\nTrackers: `%s`",
torrent.ID, torrentName, torrent.TorrentStatus(),
humanize.Bytes(torrent.Have()),
humanize.Bytes(torrent.SizeWhenDone),

View File

@@ -86,6 +86,13 @@ func (m *Monitor) checkCompletions() {
m.statesMutex.Lock()
defer m.statesMutex.Unlock()
// Create a set of current torrent IDs
currentTorrentIDs := make(map[int]bool, len(torrents))
for _, torrent := range torrents {
currentTorrentIDs[torrent.ID] = true
}
// Check each torrent for completion and update states
for _, torrent := range torrents {
prevState, exists := m.states[torrent.ID]
@@ -121,5 +128,12 @@ func (m *Monitor) checkCompletions() {
delete(m.states, torrent.ID)
}
}
// Clean up states for torrents that no longer exist
for torrentID := range m.states {
if !currentTorrentIDs[torrentID] {
delete(m.states, torrentID)
}
}
}

View File

@@ -182,9 +182,10 @@ func (c *CachedClient) ExecuteAddCommand(cmd transmission.AddCommand) (*transmis
return torrent, err
}
// SetSort sets sort type
// SetSort sets sort type and invalidates cache since it changes the order
func (c *CachedClient) SetSort(sort transmission.SortType) {
c.client.SetSort(sort)
c.InvalidateCache()
}
// Version returns transmission version