网站被刷流量保JJ脚本-DMIT sPro这款机子比较耐刷(口子小有好处)

这几天我的小破站流量总是被刷空,先是DMIT维多利亚的500G流量,再次是DMIT品川的500G,大口子的机子不耐刷

直接被刷关机了,都没法看日志

刚开始以为是被攻击了,换了DMIT sPro Creator建站不死鸟这款机器,流量比较多也耐刷(口子比较小)

今天早上看探针,好家伙,又是200G没了。不死鸟比较硬,我看日志,是被一个IP,121.231.179.69 来自常州电信的IP死命刷

为了保小鸡,让claude.ai写了个每小时监控流量的程序,以便手动/自动来ban一些大流量的IP

程序有以下功能:
1、每5秒统计一次,每1小时汇报机器流量使用情况;(telegram通知)
2、每小时有流量大于3G的ip,用iptables自动封禁,并通知;
3、有白名单,可以手动添加需要排除的IP,不受第二条3G的限制;
4、你也可以投喂给ai,让它给你添加自动关机等功能

使用:
1、新建一个文件夹,例如/home/traffic_monitor,将主文件和核心文件放入(traffic_monitor.py、traffic_monitor_core.py)
主文件traffic_monitor.py

#!/usr/bin/env python3
import argparse
import time
from traffic_monitor_core import TrafficMonitor

def main():
    parser = argparse.ArgumentParser(description='VPS流量监控工具')
    parser.add_argument('-i', '--interval', type=int, default=5,
                      help='统计间隔(秒), 默认5秒')
    parser.add_argument('-d', '--debug', action='store_true',
                      help='启用调试模式')
    args = parser.parse_args()
    
    monitor = TrafficMonitor(debug=args.debug)
    
    try:
        monitor.start_tcpdump()
        while True:
            monitor.update_traffic_stats()
            monitor.display_stats()
            time.sleep(args.interval)
    except KeyboardInterrupt:
        print("
监控已停止")
        monitor.cleanup()
    except Exception as e:
        print(f"发生错误: {str(e)}")
        monitor.cleanup()

if __name__ == "__main__":
    main()

核心文件:traffic_monitor_core.py
需要你自己修改的地方有:
第62行,要排除的IP,暨IP白名单
第73行,你的telegram机器人token
第74行,机器人通知对象的id

# traffic_monitor_core.py
import subprocess
import time
import datetime
from collections import defaultdict
from prettytable import PrettyTable
import logging
import threading
import queue
import re
import requests
import json

class TelegramNotifier:
    def __init__(self, token, chat_id):
        self.token = token
        self.chat_id = chat_id
        self.base_url = f"https://api.telegram.org/bot{token}/sendMessage"
        self.last_notification_time = None
        self.notification_interval = 3600  # 1小时通知一次
        
    def send_message(self, message):
        try:
            current_time = time.time()
            if (self.last_notification_time is None or 
                current_time - self.last_notification_time >= self.notification_interval):
                
                payload = {
                    'chat_id': self.chat_id,
                    'text': message,
                    'parse_mode': 'HTML'
                }
                
                response = requests.post(self.base_url, json=payload)
                if response.status_code == 200:
                    self.last_notification_time = current_time
                    return True
                else:
                    logging.error(f"发送Telegram消息失败: {response.text}")
                    return False
                    
        except Exception as e:
            logging.error(f"发送Telegram消息时出错: {str(e)}")
            return False

class TrafficMonitor:
    def __init__(self, debug=False):
        self.ip_traffic = defaultdict(lambda: {
            'bytes_sent': 0, 
            'bytes_recv': 0,
            'last_reset': time.time(),  # 添加流量重置时间
            'banned': False  # 标记IP是否被禁止
        })
        self.start_time = datetime.datetime.now()
        self.packet_queue = queue.Queue()
        self.should_stop = False
        
        # 流量限制配置
        self.TRAFFIC_LIMIT = 3 * 1024 * 1024 * 1024  # 3GB
        self.MONITOR_INTERVAL = 3600  # 1小时
        self.banned_ips = set()  # 记录被封禁的IP
        self.whitelist_ips = {''}  # 白名单IP列表
        self.last_hourly_notification = 0  # 上次小时通知的时间
        
        # 设置日志
        logging.basicConfig(
            level=logging.DEBUG if debug else logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.logger = logging.getLogger(__name__)
        
        # Telegram配置
        telegram_token = ""
        telegram_chat_id = ""
        self.telegram = TelegramNotifier(telegram_token, telegram_chat_id)
        
        # 启动IP封禁检查线程
        self.start_ban_checker()
        # 启动小时通知线程
        self.start_hourly_notifier()

    def start_ban_checker(self):
        """启动IP封禁检查线程"""
        self.ban_checker_thread = threading.Thread(target=self._check_traffic_limits)
        self.ban_checker_thread.daemon = True
        self.ban_checker_thread.start()

    def start_tcpdump(self):
        """启动tcpdump捕获流量"""
        try:
            cmd = [
                'tcpdump',
                '-n',              # 不解析主机名
                '-q',              # 快速输出
                '-l',              # 行缓冲
                'ip or ip6'        # 捕获IPv4和IPv6
            ]
            
            self.logger.debug(f"启动tcpdump命令: {' '.join(cmd)}")
            
            self.tcpdump_process = subprocess.Popen(
                cmd,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                universal_newlines=True
            )
            
            # 启动读取线程
            self.reader_thread = threading.Thread(target=self._read_tcpdump_output)
            self.reader_thread.daemon = True
            self.reader_thread.start()
            
        except Exception as e:
            self.logger.error(f"启动tcpdump失败: {str(e)}")
            raise

    def _read_tcpdump_output(self):
        """读取tcpdump输出的线程函数"""
        while not self.should_stop:
            try:
                line = self.tcpdump_process.stdout.readline()
                if not line:
                    break
                if "listening on" not in line and "tcpdump:" not in line:
                    self.logger.debug(f"捕获数据包: {line.strip()}")
                    self.packet_queue.put(line)
            except Exception as e:
                self.logger.error(f"读取tcpdump输出错误: {str(e)}")
                break

    def _check_traffic_limits(self):
        """检查IP流量并进行封禁"""
        while not self.should_stop:
            try:
                current_time = time.time()
                for ip, stats in self.ip_traffic.items():
                    # 跳过已封禁和白名单中的IP
                    if stats['banned'] or ip in self.whitelist_ips:
                        continue
                        
                    # 检查是否需要重置计数器
                    if current_time - stats['last_reset'] >= self.MONITOR_INTERVAL:
                        stats['bytes_sent'] = 0
                        stats['bytes_recv'] = 0
                        stats['last_reset'] = current_time
                        continue
                    
                    # 计算总流量
                    total_traffic = stats['bytes_sent']   stats['bytes_recv']
                    
                    # 如果超过限制,执行封禁
                    if total_traffic > self.TRAFFIC_LIMIT:
                        self.ban_ip(ip, total_traffic)
                
            except Exception as e:
                self.logger.error(f"检查流量限制时出错: {str(e)}")
                
            time.sleep(60)  # 每分钟检查一次

    def ban_ip(self, ip, total_traffic):
        """封禁指定IP"""
        try:
            if ip in self.banned_ips:
                return
                
            # 添加iptables规则
            commands = [
                ['iptables', '-A', 'INPUT', '-s', ip, '-j', 'DROP'],
                ['iptables', '-A', 'OUTPUT', '-d', ip, '-j', 'DROP']
            ]
            
            for cmd in commands:
                result = subprocess.run(cmd, capture_output=True, text=True)
                if result.returncode != 0:
                    self.logger.error(f"执行iptables命令失败: {result.stderr}")
                    return
            
            # 更新状态
            self.ip_traffic[ip]['banned'] = True
            self.banned_ips.add(ip)
            
            # 发送通知
            ban_message = (
                f"🚫 IP封禁通知

"
                f"IP地址: {ip}
"
                f"总流量: {self.format_bytes(total_traffic)}
"
                f"时间范围: {self.MONITOR_INTERVAL//3600}小时
"
                f"触发阈值: {self.format_bytes(self.TRAFFIC_LIMIT)}
"
                f"封禁时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}"
            )
            
            if self.telegram:
                self.telegram.send_message(ban_message)
                
            self.logger.warning(f"IP {ip} 已被封禁,总流量: {self.format_bytes(total_traffic)}")
            
        except Exception as e:
            self.logger.error(f"封禁IP {ip} 时出错: {str(e)}")

    def parse_tcpdump_line(self, line):
        """解析tcpdump输出行"""
        try:
            # 跳过时间戳部分
            if " IP " in line:
                # IPv4格式解析
                parts = line.split(" IP ")[1].strip()
                # 分离源和目标
                src_dst = parts.split(" > ")
                if len(src_dst) != 2:
                    return None
                    
                src_parts = src_dst[0].split(".")
                dst_parts = src_dst[1].split(":")
                
                # 提取IP地址(去掉端口号)
                src_ip = ".".join(src_parts[:4])
                dst_ip = dst_parts[0].strip().split(".")[0:4]
                dst_ip = ".".join(dst_ip)
                
                # 提取长度
                length = 0
                if "length" in line:
                    length = int(line.split("length")[-1].strip())
                elif "tcp" in line:
                    length = int(line.split("tcp")[-1].strip())
                elif "UDP" in line:
                    length = int(line.split("UDP,")[-1].split()[0].strip())
                
                self.logger.debug(f"解析结果: 源IP={src_ip}, 目标IP={dst_ip}, 长度={length}")
                return src_ip, dst_ip, length
                
            elif " IP6 " in line:
                # IPv6格式解析
                parts = line.split(" IP6 ")[1].strip()
                src_dst = parts.split(" > ")
                if len(src_dst) != 2:
                    return None
                    
                src_ip = src_dst[0].split()[0]
                dst_ip = src_dst[1].split()[0]
                
                # 提取长度
                length = 0
                if "length" in line:
                    length = int(line.split("length")[-1].strip())
                
                self.logger.debug(f"解析结果: 源IP={src_ip}, 目标IP={dst_ip}, 长度={length}")
                return src_ip, dst_ip, length
                
            return None
            
        except Exception as e:
            self.logger.error(f"解析行时出错: {str(e)}, 行内容: {line}")
            return None

    def format_bytes(self, bytes):
        """将字节数格式化为人类可读的格式"""
        for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
            if bytes  0:
                if ip in self.whitelist_ips:
                    status = "⭐️ 白名单"
                elif stats['banned']:
                    status = "🚫 已封禁"
                else:
                    status = "✅ 正常"
                message.append(
                    f"{rank}. IP: {ip} ({status})
"
                    f"   ↑ 发送: {self.format_bytes(stats['bytes_sent'])}
"
                    f"   ↓ 接收: {self.format_bytes(stats['bytes_recv'])}
"
                    f"   💡 总计: {self.format_bytes(total)}"
                )
        
        total_traffic = sum(stats['bytes_sent']   stats['bytes_recv'] 
                          for stats in self.ip_traffic.values())
        message.append(f"
📊 总流量: {self.format_bytes(total_traffic)}")
        message.append(f"🌐 监控IP数: {len(self.ip_traffic)}")
        message.append(f"🚫 已封禁IP数: {len(self.banned_ips)}")
        
        return "
".join(message)

    def update_traffic_stats(self):
        """更新流量统计信息"""
        try:
            packets_processed = 0
            while not self.packet_queue.empty():
                line = self.packet_queue.get_nowait()
                result = self.parse_tcpdump_line(line)
                
                if result:
                    src_ip, dst_ip, length = result
                    # 更新统计
                    self.ip_traffic[src_ip]['bytes_sent']  = length
                    self.ip_traffic[dst_ip]['bytes_recv']  = length
                    packets_processed  = 1
                
            if packets_processed > 0:
                self.logger.debug(f"本次更新处理了 {packets_processed} 个数据包")
                    
        except queue.Empty:
            pass
        except Exception as e:
            self.logger.error(f"更新统计信息时出错: {str(e)}")

    def display_stats(self):
        """显示流量统计信息"""
        table = PrettyTable()
        table.field_names = ["排名", "IP地址", "状态", "发送流量", "接收流量", "总流量"]
        
        # 按总流量排序
        sorted_ips = sorted(
            self.ip_traffic.items(),
            key=lambda x: x[1]['bytes_sent']   x[1]['bytes_recv'],
            reverse=True
        )[:10]  # 只取前10名
        
        for rank, (ip, stats) in enumerate(sorted_ips, 1):
            total = stats['bytes_sent']   stats['bytes_recv']
            if total > 0:  # 只显示有流量的IP
                if ip in self.whitelist_ips:
                    status = "⭐️ 白名单"
                elif stats['banned']:
                    status = "🚫 已封禁"
                else:
                    status = "✅ 正常"
                table.add_row([
                    rank,
                    ip,
                    status,
                    self.format_bytes(stats['bytes_sent']),
                    self.format_bytes(stats['bytes_recv']),
                    self.format_bytes(total)
                ])
            
        # 清屏并显示
        print("\033[H\033[J")  # 清屏
        print(f"流量监控统计 (开始于: {self.start_time.strftime('%Y-%m-%d %H:%M:%S')})")
        print(f"当前时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
        print(table)
        
        # 显示处理的数据包总数
        total_traffic = sum(stats['bytes_sent']   stats['bytes_recv'] 
                          for stats in self.ip_traffic.values())
        print(f"
总计处理流量: {self.format_bytes(total_traffic)}")
        print(f"监控的IP数量: {len(self.ip_traffic)}")
        print(f"已封禁IP数量: {len(self.banned_ips)}")
        
        # 不再在这里发送 Telegram 通知

    def start_hourly_notifier(self):
        """启动小时通知线程"""
        self.hourly_notifier_thread = threading.Thread(target=self._hourly_notification_task)
        self.hourly_notifier_thread.daemon = True
        self.hourly_notifier_thread.start()

    def _hourly_notification_task(self):
        """小时通知任务"""
        while not self.should_stop:
            current_time = time.time()
            if current_time - self.last_hourly_notification >= 3600:  # 1小时
                self.send_hourly_notification()
                self.last_hourly_notification = current_time
            time.sleep(60)  # 每分钟检查一次

    def send_hourly_notification(self):
        """发送每小时的统计通知"""
        try:
            # 按总流量排序
            sorted_ips = sorted(
                self.ip_traffic.items(),
                key=lambda x: x[1]['bytes_sent']   x[1]['bytes_recv'],
                reverse=True
            )

            message = [
                "🕐 每小时流量统计报告
",
                f"统计时间: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
",
                "📊 TOP 5 流量使用情况:"
            ]

            # 只取前5名
            for rank, (ip, stats) in enumerate(sorted_ips[:5], 1):
                total = stats['bytes_sent']   stats['bytes_recv']
                if total > 0:
                    if ip in self.whitelist_ips:
                        status = "⭐️ 白名单"
                    elif stats['banned']:
                        status = "🚫 已封禁"
                    else:
                        status = "✅ 正常"
                    message.append(
                        f"
{rank}. IP: {ip} ({status})"
                        f"
   ↑ 发送: {self.format_bytes(stats['bytes_sent'])}"
                        f"
   ↓ 接收: {self.format_bytes(stats['bytes_recv'])}"
                        f"
   💡 总计: {self.format_bytes(total)}"
                    )

            # 添加总体统计
            total_traffic = sum(stats['bytes_sent']   stats['bytes_recv'] 
                            for stats in self.ip_traffic.values())
            message.append(f"

📈 系统总流量: {self.format_bytes(total_traffic)}")
            message.append(f"👥 监控IP总数: {len(self.ip_traffic)}")
            message.append(f"⛔️ 已封禁IP数: {len(self.banned_ips)}")

            # 发送消息
            self.telegram.notification_interval = 0  # 确保消息能立即发送
            self.telegram.send_message("
".join(message))
            self.telegram.notification_interval = 3600  # 恢复正常间隔

        except Exception as e:
            self.logger.error(f"发送每小时通知时出错: {str(e)}")

    def cleanup(self):
        """清理资源"""
        self.should_stop = True
        if hasattr(self, 'tcpdump_process'):
            self.tcpdump_process.terminate()
            self.tcpdump_process.wait()

2、新建一个进程保护,运行主文件traffic_monitor.py

nano /etc/systemd/system/traffic_monitor_tg_push.service

粘贴以下进程保护内容,注意地址和你新建的文件夹地址保持一致(我是用虚拟的环境运行,你也可以直接修改为:ExecStart=python3 /home/traffic_monitor/traffic_monitor.py)

[Unit]
Description=Traffic monitor & telegram message push
After=network.target

[Service]
Type=simple
User=root
WorkingDirectory=/home/traffic_monitor
ExecStart=/root/venv/bin/python /home/traffic_monitor/traffic_monitor.py
Restart=always
RestartSec=10

[Install]
WantedBy=multi-user.target

chmod x /etc/systemd/system/traffic_monitor_tg_push.service
systemctl start traffic_monitor_tg_push.service
systemctl enable traffic_monitor_tg_push.service
systemctl status traffic_monitor_tg_push.service

本人是程序小白,以上都是ai产生的结果,确实很牛逼。claude.ai在编程的功力比chatgpt强,不枉每个月在他们俩各投入20刀。
之前搬瓦工bigerbox邀请码机器人也是他们俩合作给我弄出来的

论坛内已经有很多高级的脚本可以控制服务器,这个论坛内中国黄金不敢班门弄斧,单纯纯开个贴,保住自己的小鸡不受太多伤害,希望各位大佬轻喷。

点赞
  1. HgTrojan说道:

    也可以通过探针实现,以哪吒为例,可以设置遇到攻击探针触发 触发任务 自动封禁IP(或者关机,这个比较简单)等过一会再自动解除封禁就好了。
    而且遇到攻击探针一般都有通知只不过没有这么详细,但好处就是可以用一个bot监控所有机器

  2. Lian说道:

    @欧巴 #18
    那是更好的机器吧?我没用过高防机 我就买的40刀一年的就来裸奔了 套了cf确实就没有那么丝滑了
    想要丝滑还是把预算给到cf会员上比较好
    不要把d哥看扁了 高防机一样能给你打崩 我最开始小白啥也不懂 被d哥上了几堂课 全懂了
    楼主的这个方法确实是个方法 但双向流量计算的都难逃魔爪 你防不住d哥的机器也多啊 想要搞你 饶过你的规则是迟早的事
    ddos打进来就是下行拉满 下行流量没得到清洗就会耗光你的总流量 顶不住的

  3. at说道:

    @欧巴 #22
    配一下防火墙什么的应该会好点,ufw,fail2ban什么的

    # Limit to 50 concurrent connections on port 80/443 per IP
    -A ufw-before-input -p tcp --syn --dport 80 -m connlimit --connlimit-above 50 -j DROP
    -A ufw-before-input -p tcp --syn --dport 443 -m connlimit --connlimit-above 50 -j DROP
    # Limit to 100 connections on port 80/443 per 2 seconds per IP
    -A ufw-before-input -p tcp --dport 80 -i eth0 -m state --state NEW -m recent --set
    -A ufw-before-input -p tcp --dport 80 -i eth0 -m state --state NEW -m recent --update --seconds 2 --hitcount 100 -j DROP
    -A ufw-before-input -p tcp --dport 443 -i eth0 -m state --state NEW -m recent --set
    -A ufw-before-input -p tcp --dport 443 -i eth0 -m state --state NEW -m recent --update --seconds 2 --hitcount 100 -j DROP
    

    Nginx或者Caddy可以配rate limit和waf
    开源防火墙推荐Crowdsec

    咱站天天被攻击都没事 rt.http3.lol

回复 HgTrojan 取消回复

电子邮件地址不会被公开。必填项已用 * 标注

×
订阅图标按钮