TG-forward-videos-plus3

import asyncio
import os
import random
import logging
import hashlib
from typing import List, Dict

import aiosqlite
from telethon import TelegramClient
from telethon.tl.types import MessageMediaDocument, DocumentAttributeVideo, Message
from telethon.errors import (
    FloodWaitError, 
    SecurityError, 
    FileReferenceExpiredError,
    rpcerrorlist
)
from dotenv import load_dotenv

# 加载环境变量
load_dotenv()

# ==================== 🛠️ 配置读取帮助函数 ====================
def get_env_float(key, default=0.0):
    val = os.getenv(key, "")
    if not val: return default
    try:
        return float(val)
    except ValueError:
        print(f"⚠️ 配置错误: {key} 必须是数字,当前为 '{val}',已重置为 {default}")
        return default

def get_env_int(key, default=0):
    val = os.getenv(key, "")
    if not val: return default
    try:
        return int(val)
    except ValueError:
        return default

# ==================== ⚙️ 基础配置 ====================
API_ID = get_env_int("API_ID")
API_HASH = os.getenv("API_HASH")
PHONE_NUMBER = os.getenv("PHONE_NUMBER")

# 🔍 过滤配置 (直接读取并打印,确保生效)
MIN_SIZE_MB = get_env_float("MIN_SIZE_MB", 0)
MAX_SIZE_MB = get_env_float("MAX_SIZE_MB", 0)
MIN_DURATION = get_env_int("MIN_DURATION", 0)
MAX_DURATION = get_env_int("MAX_DURATION", 0)

# 🛡️ 安全扫描
STOP_AFTER_DUPLICATES = 100  # 连续遇到多少个重复视频后停止扫描该频道
MAX_SCAN_COUNT = 10000       # 每个频道最大扫描历史消息数 (防止无限扫描)

# ⏱️ 频率控制
MIN_INTERVAL = 2
MAX_INTERVAL = 5
ALBUM_WAIT_TIME = 3.0

# ==================== 📋 任务清单 ====================
TASK_CONFIG = [
    # 示例任务
    # {
    #     "sources": [-1002101568388], 
    #     "target": -1002976532877,
    #     "limit": 100
    # },
    {
         "sources": [-1001532416024], 
         "target": -1003613293007,
         "limit": 4000
    },
]

# ==================== 日志初始化 ====================
logging.basicConfig(
    level=logging.INFO, 
    format="%(asctime)s - %(message)s",
    datefmt="%H:%M:%S"
)
logger = logging.getLogger("AutoBot")

client = TelegramClient("user_session", API_ID, API_HASH)

# 全局变量
forward_queue = None
pending_albums = {}
album_timers = {}
db = None

# ==================== 🗄️ 数据库类 ====================
class AsyncDB:
    def __init__(self, path):
        self.path = path
        self.conn = None
        self.lock = asyncio.Lock()

    async def connect(self):
        self.conn = await aiosqlite.connect(self.path)
        await self.conn.execute("CREATE TABLE IF NOT EXISTS videos (video_key TEXT PRIMARY KEY)")
        await self.conn.commit()

    async def close(self):
        if self.conn: await self.conn.close()

    async def seen(self, key):
        async with self.lock:
            async with self.conn.execute("SELECT 1 FROM videos WHERE video_key=?", (key,)) as cursor:
                return await cursor.fetchone() is not None

    async def mark(self, key):
        async with self.lock:
            await self.conn.execute("INSERT OR IGNORE INTO videos (video_key) VALUES (?)", (key,))
            await self.conn.commit()

# ==================== 🛠️ 核心工具函数 ====================

def get_unique_key(msg: Message) -> str:
    """生成唯一去重键"""
    return f"{msg.chat_id}_{msg.media.document.id}"

def generate_safe_filename(sources: List[int], target: int) -> str:
    """生成安全的文件名,防止过长"""
    sources_str = "_".join([str(abs(s)) for s in sources])
    target_str = str(abs(target))
    
    if len(sources_str) > 50: # 如果文件名太长,使用 Hash
        hash_object = hashlib.md5(sources_str.encode())
        short_hash = hash_object.hexdigest()[:8]
        preview = "_".join([str(abs(s)) for s in sources[:2]])
        return f"{preview}_etc_{short_hash}_to_{target_str}.db"
    
    return f"{sources_str}to{target_str}.db"

def is_video(msg: Message) -> bool:
    """检查消息是否为符合条件的视频"""
    if not msg.media or not isinstance(msg.media, MessageMediaDocument): 
        return False
    
    doc = msg.media.document
    
    # 1. 格式检查
    if not doc.mime_type.startswith("video"): 
        return False
    
    # 2. 排除圆形视频 (Video Note)
    video_attr = next((a for a in doc.attributes if isinstance(a, DocumentAttributeVideo)), None)
    if getattr(video_attr, "round_message", False): 
        return False

    # 3. 大小检查 (核心修复点)
    size_mb = doc.size / (1024.0 * 1024.0)
    
    if MIN_SIZE_MB > 0 and size_mb < MIN_SIZE_MB:
        # print(f"\r❌ [跳过] 视频太小: {size_mb:.2f}MB < {MIN_SIZE_MB}MB")
        return False
        
    if MAX_SIZE_MB > 0 and size_mb > MAX_SIZE_MB:
        # print(f"\r❌ [跳过] 视频太大: {size_mb:.2f}MB > {MAX_SIZE_MB}MB")
        return False
    
    # 4. 时长检查
    if video_attr:
        duration = video_attr.duration
        if MIN_DURATION > 0 and duration < MIN_DURATION: return False
        if MAX_DURATION > 0 and duration > MAX_DURATION: return False
    
    return True

# ==================== 🔄 异步逻辑 ====================

async def process_album_later(grouped_id):
    """延迟处理相册,等待一组消息到齐"""
    try:
        await asyncio.sleep(ALBUM_WAIT_TIME)
        if grouped_id in pending_albums:
            messages = pending_albums.pop(grouped_id)
            album_timers.pop(grouped_id, None)
            if messages:
                messages.sort(key=lambda x: x.id)
                await forward_queue.put(messages) # 整个列表作为一个相册放入队列
    except asyncio.CancelledError:
        pass

async def queue_message(message: Message):
    """将消息分类放入队列(单条或相册)"""
    if message.grouped_id:
        gid = message.grouped_id
        if gid not in pending_albums: pending_albums[gid] = []
        pending_albums[gid].append(message)
        
        # 重置计时器
        if gid in album_timers: album_timers[gid].cancel()
        album_timers[gid] = asyncio.create_task(process_album_later(gid))
    else:
        await forward_queue.put([message]) # 包装成列表统一格式

async def worker(target_channel_id):
    """消费者:负责发送消息"""
    processed_count = 0
    while True:
        try:
            batch = await forward_queue.get()
            if batch is None: 
                forward_queue.task_done()
                break

            files = [m.media for m in batch]
            caption = batch[0].text or ""
            
            try:
                await client.send_file(target_channel_id, files, caption=caption)
                processed_count += len(batch)
                logger.info(f"✅ 发送成功 | 本次: {len(batch)} | 总计: {processed_count}")
                
                for m in batch: 
                    await db.mark(get_unique_key(m))
                
                await asyncio.sleep(random.uniform(MIN_INTERVAL, MAX_INTERVAL))

            except FloodWaitError as e:
                logger.warning(f"⏳ 触发流控 (FloodWait): 暂停 {e.seconds} 秒")
                await asyncio.sleep(e.seconds + 2)
            except FileReferenceExpiredError:
                logger.error("❌ 文件引用过期,跳过")
            except Exception as e:
                logger.error(f"❌ 发送异常: {e}")

            forward_queue.task_done()
        except Exception as e:
            logger.error(f"Worker Error: {e}")

async def scanner(source_channels, forward_limit):
    """生产者:扫描历史消息"""
    all_collected_videos = []

    for ch in source_channels:
        channel_collected = 0 
        logger.info(f"🔍 正在扫描: {ch} (目标: {forward_limit})")
        
        consecutive_duplicates = 0
        scanned_count = 0
        
        async for msg in client.iter_messages(ch, limit=MAX_SCAN_COUNT):
            scanned_count += 1
            if scanned_count % 200 == 0:
                print(f"\r   ...扫描深度: {scanned_count} 条", end="")

            if not is_video(msg): 
                continue
            
            # 检查数据库
            if await db.seen(get_unique_key(msg)):
                consecutive_duplicates += 1
                if consecutive_duplicates >= STOP_AFTER_DUPLICATES:
                    print(f"\n🛑 [频道 {ch}] 连续 {STOP_AFTER_DUPLICATES} 条重复,停止扫描本频道。")
                    break
            else:
                consecutive_duplicates = 0
                all_collected_videos.append(msg)
                channel_collected += 1
                print(f"\r📦 [频道 {ch}] 命中: {channel_collected}/{forward_limit}", end="")

            if channel_collected >= forward_limit:
                print(f"\n✅ [频道 {ch}] 额度已满")
                break
        print("") # 换行

    # 排序与入队
    if all_collected_videos:
        logger.info(f"📊 扫描完成,共找到 {len(all_collected_videos)} 个新视频。正在按时间排序...")
        all_collected_videos.sort(key=lambda x: x.date) # 旧 -> 新
        
        for msg in all_collected_videos:
            await queue_message(msg)
            
        # 等待最后的相册分组
        if pending_albums:
            logger.info("⏳ 等待相册分组完成...")
            while pending_albums or album_timers:
                finished = [k for k, t in album_timers.items() if t.done()]
                for k in finished: del album_timers[k]
                if not album_timers and not pending_albums: break
                await asyncio.sleep(1)
            await asyncio.sleep(1)
    else:
        logger.info("⚠️ 没有发现任何新视频。")

    await forward_queue.put(None) # 通知 Worker 结束

async def run_single_task(task):
    global db, forward_queue, pending_albums, album_timers
    
    sources = task['sources']
    target = task['target']
    limit = task.get('limit', 200)

    db_name = generate_safe_filename(sources, target)
    logger.info(f"\n🚀 === 启动任务: {db_name} ===")
    
    # 初始化
    forward_queue = asyncio.Queue()
    pending_albums = {}
    album_timers = {}
    
    db = AsyncDB(db_name)
    await db.connect()
    
    worker_task = asyncio.create_task(worker(target))
    
    try:
        await scanner(sources, limit)
        await forward_queue.join()
    except Exception as e:
        logger.error(f"任务崩溃: {e}")
    finally:
        if not worker_task.done(): worker_task.cancel()
        await db.close()
        logger.info(f"🏁 === 任务结束 ===\n")

async def main():
    print("="*40)
    print("🎥 Auto-Forwarder Pro 启动")
    print(f"📉 最小限制: {MIN_SIZE_MB} MB")
    print(f"📈 最大限制: {MAX_SIZE_MB} MB (0 表示不限)")
    print("="*40)
    
    await client.start(PHONE_NUMBER)
    
    try:
        for task in TASK_CONFIG:
            await run_single_task(task)
    except KeyboardInterrupt:
        print("\n👋 用户停止")
    finally:
        await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main())

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注