这几天我的小破站流量总是被刷空,先是DMIT维多利亚的500G流量,再次是DMIT品川的500G,大口子的机子不耐刷
直接被刷关机了,都没法看日志
刚开始以为是被攻击了,换了DMIT sPro Creator建站不死鸟这款机器,流量比较多也耐刷(口子比较小)
今天早上看探针,好家伙,又是200G没了。不死鸟比较硬,我看日志,是被一个IP,121.231.179.69 来自常州电信的IP死命刷
为了保小鸡,让claude.ai写了个每小时监控流量的程序,以便手动/自动来ban一些大流量的IP
程序有以下功能:
⭐1、每5秒统计一次,每1小时汇报机器流量使用情况;(telegram通知)
⭐2、每小时有流量大于3G的ip,用iptables自动封禁,并通知;
⭐3、有白名单,可以手动添加需要排除的IP,不受第二条3G的限制;
⭐4、你也可以投喂给ai,让它给你添加自动关机等功能
使用:
1、新建一个文件夹,例如/home/traffic_monitor,将主文件和核心文件放入(traffic_monitor.py、traffic_monitor_core.py)
✅主文件traffic_monitor.py
#!/usr/bin/env python3
import argparse
import time
from traffic_monitor_core import TrafficMonitor
def main():
parser = argparse.ArgumentParser(description='VPS流量监控工具')
parser.add_argument('-i', '--interval', type=int, default=5,
help='统计间隔(秒), 默认5秒')
parser.add_argument('-d', '--debug', action='store_true',
help='启用调试模式')
args = parser.parse_args()
monitor = TrafficMonitor(debug=args.debug)
try:
monitor.start_tcpdump()
while True:
monitor.update_traffic_stats()
monitor.display_stats()
time.sleep(args.interval)
except KeyboardInterrupt:
print("
监控已停止")
monitor.cleanup()
except Exception as e:
print(f"发生错误: {str(e)}")
monitor.cleanup()
if __name__ == "__main__":
main()
✅核心文件:traffic_monitor_core.py
需要你自己修改的地方有:
第62行,要排除的IP,暨IP白名单
第73行,你的telegram机器人token
第74行,机器人通知对象的id
# traffic_monitor_core.py
import subprocess
import time
import datetime
from collections import defaultdict
from prettytable import PrettyTable
import logging
import threading
import queue
import re
import requests
import json
class TelegramNotifier:
def __init__(self, token, chat_id):
self.token = token
self.chat_id = chat_id
self.base_url = f"https://api.telegram.org/bot{token}/sendMessage"
self.last_notification_time = None
self.notification_interval = 3600 # 1小时通知一次
def send_message(self, message):
try:
current_time = time.time()
if (self.last_notification_time is None or
current_time - self.last_notification_time >= self.notification_interval):
payload = {
'chat_id': self.chat_id,
'text': message,
'parse_mode': 'HTML'
}
response = requests.post(self.base_url, json=payload)
if response.status_code == 200:
self.last_notification_time = current_time
return True
else:
logging.error(f"发送Telegram消息失败: {response.text}")
return False
except Exception as e:
logging.error(f"发送Telegram消息时出错: {str(e)}")
return False
class TrafficMonitor:
def __init__(self, debug=False):
self.ip_traffic = defaultdict(lambda: {
'bytes_sent': 0,
'bytes_recv': 0,
'last_reset': time.time(), # 添加流量重置时间
'banned': False # 标记IP是否被禁止
})
self.start_time = datetime.datetime.now()
self.packet_queue = queue.Queue()
self.should_stop = False
# 流量限制配置
self.TRAFFIC_LIMIT = 3 * 1024 * 1024 * 1024 # 3GB
self.MONITOR_INTERVAL = 3600 # 1小时
self.banned_ips = set() # 记录被封禁的IP
self.whitelist_ips = {''} # 白名单IP列表
self.last_hourly_notification = 0 # 上次小时通知的时间
# 设置日志
logging.basicConfig(
level=logging.DEBUG if debug else logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.logger = logging.getLogger(__name__)
# Telegram配置
telegram_token = ""
telegram_chat_id = ""
self.telegram = TelegramNotifier(telegram_token, telegram_chat_id)
# 启动IP封禁检查线程
self.start_ban_checker()
# 启动小时通知线程
self.start_hourly_notifier()
def start_ban_checker(self):
"""启动IP封禁检查线程"""
self.ban_checker_thread = threading.Thread(target=self._check_traffic_limits)
self.ban_checker_thread.daemon = True
self.ban_checker_thread.start()
def start_tcpdump(self):
"""启动tcpdump捕获流量"""
try:
cmd = [
'tcpdump',
'-n', # 不解析主机名
'-q', # 快速输出
'-l', # 行缓冲
'ip or ip6' # 捕获IPv4和IPv6
]
self.logger.debug(f"启动tcpdump命令: {' '.join(cmd)}")
self.tcpdump_process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
# 启动读取线程
self.reader_thread = threading.Thread(target=self._read_tcpdump_output)
self.reader_thread.daemon = True
self.reader_thread.start()
except Exception as e:
self.logger.error(f"启动tcpdump失败: {str(e)}")
raise
def _read_tcpdump_output(self):
"""读取tcpdump输出的线程函数"""
while not self.should_stop:
try:
line = self.tcpdump_process.stdout.readline()
if not line:
break
if "listening on" not in line and "tcpdump:" not in line:
self.logger.debug(f"捕获数据包: {line.strip()}")
self.packet_queue.put(line)
except Exception as e:
self.logger.error(f"读取tcpdump输出错误: {str(e)}")
break
def _check_traffic_limits(self):
"""检查IP流量并进行封禁"""
while not self.should_stop:
try:
current_time = time.time()
for ip, stats in self.ip_traffic.items():
# 跳过已封禁和白名单中的IP
if stats['banned'] or ip in self.whitelist_ips:
continue
# 检查是否需要重置计数器
if current_time - stats['last_reset'] >= self.MONITOR_INTERVAL:
stats['bytes_sent'] = 0
stats['bytes_recv'] = 0
stats['last_reset'] = current_time
continue
# 计算总流量
total_traffic = stats['bytes_sent'] stats['bytes_recv']
# 如果超过限制,执行封禁
if total_traffic > self.TRAFFIC_LIMIT:
self.ban_ip(ip, total_traffic)
except Exception as e:
self.logger.error(f"检查流量限制时出错: {str(e)}")
time.sleep(60) # 每分钟检查一次
def ban_ip(self, ip, total_traffic):
"""封禁指定IP"""
try:
if ip in self.banned_ips:
return
# 添加iptables规则
commands = [
['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'],
['iptables', '-A', 'OUTPUT', '-d', ip, '-j', 'DROP']
]
for cmd in commands:
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode != 0:
self.logger.error(f"执行iptables命令失败: {result.stderr}")
return
# 更新状态
self.ip_traffic[ip]['banned'] = True
self.banned_ips.add(ip)
# 发送通知
ban_message = (
f"🚫 IP封禁通知
"
f"IP地址: {ip}
"
f"总流量: {self.format_bytes(total_traffic)}
"
f"时间范围: {self.MONITOR_INTERVAL//3600}小时
"
f"触发阈值: {self.format_bytes(self.TRAFFIC_LIMIT)}
"
f"封禁时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
)
if self.telegram:
self.telegram.send_message(ban_message)
self.logger.warning(f"IP {ip} 已被封禁,总流量: {self.format_bytes(total_traffic)}")
except Exception as e:
self.logger.error(f"封禁IP {ip} 时出错: {str(e)}")
def parse_tcpdump_line(self, line):
"""解析tcpdump输出行"""
try:
# 跳过时间戳部分
if " IP " in line:
# IPv4格式解析
parts = line.split(" IP ")[1].strip()
# 分离源和目标
src_dst = parts.split(" > ")
if len(src_dst) != 2:
return None
src_parts = src_dst[0].split(".")
dst_parts = src_dst[1].split(":")
# 提取IP地址(去掉端口号)
src_ip = ".".join(src_parts[:4])
dst_ip = dst_parts[0].strip().split(".")[0:4]
dst_ip = ".".join(dst_ip)
# 提取长度
length = 0
if "length" in line:
length = int(line.split("length")[-1].strip())
elif "tcp" in line:
length = int(line.split("tcp")[-1].strip())
elif "UDP" in line:
length = int(line.split("UDP,")[-1].split()[0].strip())
self.logger.debug(f"解析结果: 源IP={src_ip}, 目标IP={dst_ip}, 长度={length}")
return src_ip, dst_ip, length
elif " IP6 " in line:
# IPv6格式解析
parts = line.split(" IP6 ")[1].strip()
src_dst = parts.split(" > ")
if len(src_dst) != 2:
return None
src_ip = src_dst[0].split()[0]
dst_ip = src_dst[1].split()[0]
# 提取长度
length = 0
if "length" in line:
length = int(line.split("length")[-1].strip())
self.logger.debug(f"解析结果: 源IP={src_ip}, 目标IP={dst_ip}, 长度={length}")
return src_ip, dst_ip, length
return None
except Exception as e:
self.logger.error(f"解析行时出错: {str(e)}, 行内容: {line}")
return None
def format_bytes(self, bytes):
"""将字节数格式化为人类可读的格式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if bytes 0:
if ip in self.whitelist_ips:
status = "⭐️ 白名单"
elif stats['banned']:
status = "🚫 已封禁"
else:
status = "✅ 正常"
message.append(
f"{rank}. IP: {ip} ({status})
"
f" ↑ 发送: {self.format_bytes(stats['bytes_sent'])}
"
f" ↓ 接收: {self.format_bytes(stats['bytes_recv'])}
"
f" 💡 总计: {self.format_bytes(total)}"
)
total_traffic = sum(stats['bytes_sent'] stats['bytes_recv']
for stats in self.ip_traffic.values())
message.append(f"
📊 总流量: {self.format_bytes(total_traffic)}")
message.append(f"🌐 监控IP数: {len(self.ip_traffic)}")
message.append(f"🚫 已封禁IP数: {len(self.banned_ips)}")
return "
".join(message)
def update_traffic_stats(self):
"""更新流量统计信息"""
try:
packets_processed = 0
while not self.packet_queue.empty():
line = self.packet_queue.get_nowait()
result = self.parse_tcpdump_line(line)
if result:
src_ip, dst_ip, length = result
# 更新统计
self.ip_traffic[src_ip]['bytes_sent'] = length
self.ip_traffic[dst_ip]['bytes_recv'] = length
packets_processed = 1
if packets_processed > 0:
self.logger.debug(f"本次更新处理了 {packets_processed} 个数据包")
except queue.Empty:
pass
except Exception as e:
self.logger.error(f"更新统计信息时出错: {str(e)}")
def display_stats(self):
"""显示流量统计信息"""
table = PrettyTable()
table.field_names = ["排名", "IP地址", "状态", "发送流量", "接收流量", "总流量"]
# 按总流量排序
sorted_ips = sorted(
self.ip_traffic.items(),
key=lambda x: x[1]['bytes_sent'] x[1]['bytes_recv'],
reverse=True
)[:10] # 只取前10名
for rank, (ip, stats) in enumerate(sorted_ips, 1):
total = stats['bytes_sent'] stats['bytes_recv']
if total > 0: # 只显示有流量的IP
if ip in self.whitelist_ips:
status = "⭐️ 白名单"
elif stats['banned']:
status = "🚫 已封禁"
else:
status = "✅ 正常"
table.add_row([
rank,
ip,
status,
self.format_bytes(stats['bytes_sent']),
self.format_bytes(stats['bytes_recv']),
self.format_bytes(total)
])
# 清屏并显示
print("\033[H\033[J") # 清屏
print(f"流量监控统计 (开始于: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')})")
print(f"当前时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(table)
# 显示处理的数据包总数
total_traffic = sum(stats['bytes_sent'] stats['bytes_recv']
for stats in self.ip_traffic.values())
print(f"
总计处理流量: {self.format_bytes(total_traffic)}")
print(f"监控的IP数量: {len(self.ip_traffic)}")
print(f"已封禁IP数量: {len(self.banned_ips)}")
# 不再在这里发送 Telegram 通知
def start_hourly_notifier(self):
"""启动小时通知线程"""
self.hourly_notifier_thread = threading.Thread(target=self._hourly_notification_task)
self.hourly_notifier_thread.daemon = True
self.hourly_notifier_thread.start()
def _hourly_notification_task(self):
"""小时通知任务"""
while not self.should_stop:
current_time = time.time()
if current_time - self.last_hourly_notification >= 3600: # 1小时
self.send_hourly_notification()
self.last_hourly_notification = current_time
time.sleep(60) # 每分钟检查一次
def send_hourly_notification(self):
"""发送每小时的统计通知"""
try:
# 按总流量排序
sorted_ips = sorted(
self.ip_traffic.items(),
key=lambda x: x[1]['bytes_sent'] x[1]['bytes_recv'],
reverse=True
)
message = [
"🕐 每小时流量统计报告
",
f"统计时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
",
"📊 TOP 5 流量使用情况:"
]
# 只取前5名
for rank, (ip, stats) in enumerate(sorted_ips[:5], 1):
total = stats['bytes_sent'] stats['bytes_recv']
if total > 0:
if ip in self.whitelist_ips:
status = "⭐️ 白名单"
elif stats['banned']:
status = "🚫 已封禁"
else:
status = "✅ 正常"
message.append(
f"
{rank}. IP: {ip} ({status})"
f"
↑ 发送: {self.format_bytes(stats['bytes_sent'])}"
f"
↓ 接收: {self.format_bytes(stats['bytes_recv'])}"
f"
💡 总计: {self.format_bytes(total)}"
)
# 添加总体统计
total_traffic = sum(stats['bytes_sent'] stats['bytes_recv']
for stats in self.ip_traffic.values())
message.append(f"
📈 系统总流量: {self.format_bytes(total_traffic)}")
message.append(f"👥 监控IP总数: {len(self.ip_traffic)}")
message.append(f"⛔️ 已封禁IP数: {len(self.banned_ips)}")
# 发送消息
self.telegram.notification_interval = 0 # 确保消息能立即发送
self.telegram.send_message("
".join(message))
self.telegram.notification_interval = 3600 # 恢复正常间隔
except Exception as e:
self.logger.error(f"发送每小时通知时出错: {str(e)}")
def cleanup(self):
"""清理资源"""
self.should_stop = True
if hasattr(self, 'tcpdump_process'):
self.tcpdump_process.terminate()
self.tcpdump_process.wait()
2、新建一个进程保护,运行主文件traffic_monitor.py
nano /etc/systemd/system/traffic_monitor_tg_push.service
粘贴以下进程保护内容,注意地址和你新建的文件夹地址保持一致(我是用虚拟的环境运行,你也可以直接修改为:ExecStart=python3 /home/traffic_monitor/traffic_monitor.py)
[Unit]
Description=Traffic monitor & telegram message push
After=network.target
[Service]
Type=simple
User=root
WorkingDirectory=/home/traffic_monitor
ExecStart=/root/venv/bin/python /home/traffic_monitor/traffic_monitor.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
chmod x /etc/systemd/system/traffic_monitor_tg_push.service
systemctl start traffic_monitor_tg_push.service
systemctl enable traffic_monitor_tg_push.service
systemctl status traffic_monitor_tg_push.service
本人是程序小白,以上都是ai产生的结果,确实很牛逼。claude.ai在编程的功力比chatgpt强,不枉每个月在他们俩各投入20刀。
之前搬瓦工bigerbox邀请码机器人也是他们俩合作给我弄出来的
论坛内已经有很多高级的脚本可以控制服务器,这个论坛内中国黄金不敢班门弄斧,单纯纯开个贴,保住自己的小鸡不受太多伤害,希望各位大佬轻喷。


也可以通过探针实现,以哪吒为例,可以设置遇到攻击探针触发 触发任务 自动封禁IP(或者关机,这个比较简单)等过一会再自动解除封禁就好了。
而且遇到攻击探针一般都有通知只不过没有这么详细,但好处就是可以用一个bot监控所有机器
@欧巴 #18
那是更好的机器吧?我没用过高防机 我就买的40刀一年的就来裸奔了 套了cf确实就没有那么丝滑了
想要丝滑还是把预算给到cf会员上比较好
不要把d哥看扁了 高防机一样能给你打崩 我最开始小白啥也不懂 被d哥上了几堂课 全懂了
楼主的这个方法确实是个方法 但双向流量计算的都难逃魔爪 你防不住d哥的机器也多啊 想要搞你 饶过你的规则是迟早的事
ddos打进来就是下行拉满 下行流量没得到清洗就会耗光你的总流量 顶不住的
@欧巴 #22
配一下防火墙什么的应该会好点,ufw,fail2ban什么的
Nginx或者Caddy可以配rate limit和waf
开源防火墙推荐Crowdsec
咱站天天被攻击都没事 rt.http3.lol