使用python的telethon模块登录自己的tg号来进行采集频道
直接上代码
# -*- coding: utf-8 -*-
# @Time : 2024/1/10
# @File : get_msg.py
# @Description :
import time
import threading
import queue
import pymysql
from telethon.sync import TelegramClient
import settings
from pymydb import pymydb
api_id = settings.api_id
api_hash = settings.api_hash
chat = settings.alyp_Animation
print('请输入偏移值')
offset_id = int(input()) #
query_queue = queue.Queue()
insert_queue = queue.Queue()
def db_query_worker():
while True:
try:
message = query_queue.get(timeout=1)
except queue.Empty:
continue
if message is None:
query_queue.task_done()
break # None是结束信号
# 重试逻辑
max_retries = 5 # 最大重试次数
retries = 0
while retries < max_retries:
try:
with pymydb() as db:
article_id, chat_id, text, send_time = message
query = "SELECT * FROM article where chat_id=%s and article_id=%s"
param = (chat_id, article_id)
results = db.execute_query(query, param)
if len(results) == 0:
update_query = "INSERT INTO article (id,chat_id,article_id,text,send_time) VALUES (%s,%s,%s,%s,%s)"
update_params = (int(time.time() * 1000), chat_id, article_id, text, send_time)
insert_queue.put((update_query, update_params))
print(f'{article_id} 查询完成加入插入队列 ,当前队列 {query_queue.qsize()}')
else:
print(f'{article_id} 查询完成重复 ,当前队列 {query_queue.qsize()}')
break # 如果操作成功,跳出循环
except pymysql.err.OperationalError as e:
print(f"数据库操作错误: {e}, 正在重试...")
time.sleep(2) # 等待一段时间再重试
retries = 1
except Exception as e:
print(f"发生未知错误: {e}")
break
query_queue.task_done()
def db_insert_worker():
while True:
try:
item = insert_queue.get(timeout=1)
except queue.Empty:
continue
if item is None:
insert_queue.task_done()
break # None是结束信号
# 重试逻辑
max_retries = 5 # 最大重试次数
retries = 0
while retries < max_retries:
try:
with pymydb() as db:
db.execute_update(*item)
print(f'{item[1][2]} 插入成功 ,当前队列 {insert_queue.qsize()}')
break # 如果操作成功,跳出循环
except pymysql.err.OperationalError as e:
print(f"数据库插入操作错误: {e}, 正在重试...")
time.sleep(2) # 等待一段时间再重试
retries = 1
except Exception as e:
print(f"发生未知错误: {e}")
break
insert_queue.task_done()
query_threads = []
insert_threads = []
# 启动5个查询线程
for i in range(6):
thread = threading.Thread(target=db_query_worker)
thread.start()
query_threads.append(thread)
# 启动5个插入线程
for i in range(6):
thread = threading.Thread(target=db_insert_worker)
thread.start()
insert_threads.append(thread)
with TelegramClient('user', api_id, api_hash) as client:
messages = client.iter_messages(chat, limit=1500, offset_id=offset_id)
for message in messages:
time.sleep(0.3)
query_queue.put((message.id, message.sender_id, message.text, message.date))
# 在程序的最后,确保所有线程都已完成
for thread in query_threads insert_threads:
thread.join()
# 发送结束信号并等待查询线程结束
query_queue.put(None)
# 发送结束信号并等待插入线程结束
insert_queue.put(None)
简单解释一下 主要代码就是下面的 其他的函数 我就用来写入数据库而已 不需要的可以直接删除
我采集的是网盘资源 图片我直接忽略了 如果要采集图片的话 自行gpt 即可解决
limit参数是 你要采集这个频道多少信息 采集完就自动结束了 我这里是1500
api_id , api_hash 这两个自行在这里申请 https://my.telegram.org/auth?to=apps
如果是用bot机器人的话 就不需要 首次运行代码 需要填写点信息
chat的话就是频道/群组的id 当然也可以用名字 不过不建议 https://t.me/getidsbot 转发一条信息给这个机器人 就可知道id
偏移值就是 他采集的顺序是从最新的消息开始
比如最新消息是 38000条 我到3W条的时候 tg不返回信息了 我偏移值就填30000 重新运行脚本 就可以接着采下去了
api_id = settings.api_id
api_hash = settings.api_hash
chat = settings.alyp_Animation
offset_id = 0
with TelegramClient('user', api_id, api_hash) as client:
messages = client.iter_messages(chat, limit=1500, offset_id=offset_id)
for message in messages:
print(message)
time.sleep(0.3)
query_queue.put((message.id, message.sender_id, message.text, message.date))
当然功能不限制于这些
tg真号可以采集频道信息/上传文件/下载文件......
机器人的话不能采集频道信息 可上传下载 其他还没试
我还写有一个自动下载和上传文件/视频 然后转发到指定群聊


还可以实时看到进度 不过速度实在不行 200K 我就放弃了
然后后面改成直接上传文件id 实现秒发 就是我转发视频给机器人 然后机器人发到指定频道/群聊 这样就不会显示转自哪个频道
直接上代码(参考一下 gpt自行修改)
api_id = settings.api_id
api_hash = settings.api_hash
chat = settings.test_chat_id
with TelegramClient('user', api_id, api_hash) as client:
@client.on(events.NewMessage(chats=chat))
async def my_event_handler(event):
print(event.sender_id, ':', event.text)
# 通过视频id来转发视频
if event.message.media and isinstance(event.message.media, MessageMediaDocument):
# 获取文件的id,access_hash和file_reference
id = event.message.media.document.id
access_hash = event.message.media.document.access_hash
file_reference = event.message.media.document.file_reference
# 创建一个InputDocument对象
file = InputDocument(id=id, access_hash=access_hash, file_reference=file_reference)
# 使用InputDocument对象来发送文件
await client.send_file(chat, file, caption=event.text, )


牛逼
666
http://www.662688.xyz/api/get_zy?keyword=繁花
简单做了个搜索的接口