分享下采集tg频道资源做站 1

使用python的telethon模块登录自己的tg号来进行采集频道
直接上代码

# -*- coding: utf-8 -*-
# @Time    : 2024/1/10
# @File    : get_msg.py
# @Description :

import time
import threading
import queue

import pymysql
from telethon.sync import TelegramClient
import settings
from pymydb import pymydb

api_id = settings.api_id
api_hash = settings.api_hash
chat = settings.alyp_Animation
print('请输入偏移值')
offset_id = int(input())  #

query_queue = queue.Queue()
insert_queue = queue.Queue()


def db_query_worker():
    while True:
        try:
            message = query_queue.get(timeout=1)
        except queue.Empty:
            continue

        if message is None:
            query_queue.task_done()
            break  # None是结束信号

        # 重试逻辑
        max_retries = 5  # 最大重试次数
        retries = 0
        while retries < max_retries:
            try:
                with pymydb() as db:
                    article_id, chat_id, text, send_time = message
                    query = "SELECT * FROM article where chat_id=%s and article_id=%s"
                    param = (chat_id, article_id)
                    results = db.execute_query(query, param)

                    if len(results) == 0:
                        update_query = "INSERT INTO article (id,chat_id,article_id,text,send_time) VALUES (%s,%s,%s,%s,%s)"
                        update_params = (int(time.time() * 1000), chat_id, article_id, text, send_time)
                        insert_queue.put((update_query, update_params))
                        print(f'{article_id} 查询完成加入插入队列 ,当前队列 {query_queue.qsize()}')
                    else:
                        print(f'{article_id} 查询完成重复 ,当前队列 {query_queue.qsize()}')
                    break  # 如果操作成功,跳出循环
            except pymysql.err.OperationalError as e:
                print(f"数据库操作错误: {e}, 正在重试...")
                time.sleep(2)  # 等待一段时间再重试
                retries  = 1
            except Exception as e:
                print(f"发生未知错误: {e}")
                break

        query_queue.task_done()


def db_insert_worker():
    while True:
        try:
            item = insert_queue.get(timeout=1)
        except queue.Empty:
            continue

        if item is None:
            insert_queue.task_done()
            break  # None是结束信号

        # 重试逻辑
        max_retries = 5  # 最大重试次数
        retries = 0
        while retries < max_retries:
            try:
                with pymydb() as db:
                    db.execute_update(*item)
                print(f'{item[1][2]} 插入成功 ,当前队列 {insert_queue.qsize()}')
                break  # 如果操作成功,跳出循环
            except pymysql.err.OperationalError as e:
                print(f"数据库插入操作错误: {e}, 正在重试...")
                time.sleep(2)  # 等待一段时间再重试
                retries  = 1
            except Exception as e:
                print(f"发生未知错误: {e}")
                break

        insert_queue.task_done()


query_threads = []
insert_threads = []

# 启动5个查询线程
for i in range(6):
    thread = threading.Thread(target=db_query_worker)
    thread.start()
    query_threads.append(thread)

# 启动5个插入线程
for i in range(6):
    thread = threading.Thread(target=db_insert_worker)
    thread.start()
    insert_threads.append(thread)


with TelegramClient('user', api_id, api_hash) as client:
    messages = client.iter_messages(chat, limit=1500, offset_id=offset_id)
    for message in messages:
        time.sleep(0.3)
        query_queue.put((message.id, message.sender_id, message.text, message.date))

# 在程序的最后,确保所有线程都已完成
for thread in query_threads   insert_threads:
    thread.join()

# 发送结束信号并等待查询线程结束
query_queue.put(None)

# 发送结束信号并等待插入线程结束
insert_queue.put(None)


简单解释一下 主要代码就是下面的 其他的函数 我就用来写入数据库而已 不需要的可以直接删除
我采集的是网盘资源 图片我直接忽略了 如果要采集图片的话 自行gpt 即可解决
limit参数是 你要采集这个频道多少信息 采集完就自动结束了 我这里是1500
api_id , api_hash 这两个自行在这里申请 https://my.telegram.org/auth?to=apps
如果是用bot机器人的话 就不需要 首次运行代码 需要填写点信息
chat的话就是频道/群组的id 当然也可以用名字 不过不建议 https://t.me/getidsbot 转发一条信息给这个机器人 就可知道id
偏移值就是 他采集的顺序是从最新的消息开始
比如最新消息是 38000条 我到3W条的时候 tg不返回信息了 我偏移值就填30000 重新运行脚本 就可以接着采下去了

api_id = settings.api_id
api_hash = settings.api_hash
chat = settings.alyp_Animation
offset_id = 0


with TelegramClient('user', api_id, api_hash) as client:
    messages = client.iter_messages(chat, limit=1500, offset_id=offset_id)
    for message in messages:
		print(message)
        time.sleep(0.3)
        query_queue.put((message.id, message.sender_id, message.text, message.date))

当然功能不限制于这些
tg真号可以采集频道信息/上传文件/下载文件......
机器人的话不能采集频道信息 可上传下载 其他还没试

我还写有一个自动下载和上传文件/视频 然后转发到指定群聊


还可以实时看到进度 不过速度实在不行 200K 我就放弃了
然后后面改成直接上传文件id 实现秒发 就是我转发视频给机器人 然后机器人发到指定频道/群聊 这样就不会显示转自哪个频道
直接上代码(参考一下 gpt自行修改)

api_id = settings.api_id
api_hash = settings.api_hash
chat = settings.test_chat_id

with TelegramClient('user', api_id, api_hash) as client:
    @client.on(events.NewMessage(chats=chat))
    async def my_event_handler(event):

        print(event.sender_id, ':', event.text)

        # 通过视频id来转发视频
        if event.message.media and isinstance(event.message.media, MessageMediaDocument):
            # 获取文件的id,access_hash和file_reference
            id = event.message.media.document.id
            access_hash = event.message.media.document.access_hash
            file_reference = event.message.media.document.file_reference
            # 创建一个InputDocument对象
            file = InputDocument(id=id, access_hash=access_hash, file_reference=file_reference)
            # 使用InputDocument对象来发送文件
            await client.send_file(chat, file, caption=event.text, )

采了一天信息 7个频道

点赞
  1. hysteria说道:

    牛逼

  2. acform说道:

    666

  3. xianmeng说道:

    http://www.662688.xyz/api/get_zy?keyword=繁花

    简单做了个搜索的接口

发表回复

电子邮件地址不会被公开。必填项已用 * 标注

×
订阅图标按钮