So, I have a simple python script which aims at detecting syn flood attacks in a virtual network environment called mininet using scapy.
from scapy.all import *
from collections import defaultdict
import threading
import time
import sys
# Data structures
syn_to_synack = defaultdict(lambda: [0, 0]) # SYN to SYN-ACK ratio tracker
blocked_ips = set() # Blocked IPs
# Thresholds
SYN_SYNACK_RATIO_THRESHOLD = 3 # Lower ratio for easier detection during testing
CHECK_INTERVAL = 1 # Interval in seconds to check thresholds
def monitor_packets(packet):
"""
Function to process sniffed packets and update tracking data.
"""
global blocked_ips
if packet.haslayer(TCP):
tcp = packet[TCP]
ip = packet[IP]
# If the source IP is blocked, drop the packet
if ip.src in blocked_ips:
print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True)
return
# Track SYN packets
if tcp.flags == "S":
syn_to_synack[(ip.src, tcp.dport)][0] += 1 # Increment SYN count
time.sleep(0.01)
print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
# Track SYN-ACK packets
if tcp.flags == "SA":
syn_to_synack[(ip.src, tcp.sport)][1] += 1 # Increment SYN-ACK count
time.sleep(0.01) # Artificial delay for debugging
print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
def sniff_packets():
"""
Function to run sniffing in a separate thread.
"""
print("[*] Starting packet sniffing...", flush=True)
sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1)
def check_thresholds():
"""
Function to check thresholds for SYN flood detection and block offending IPs.
"""
global blocked_ips
print(f"[info] Checking thresholds...", flush=True)
print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr)
print(f"[info] Current blocked IPs: {blocked_ips}", flush=True)
# Check SYN to SYN-ACK ratio
for key, counts in syn_to_synack.items():
syn_count, synack_count = counts
print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True)
if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD:
print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True)
src_ip = key[0] # Extract the source IP from the key
print(f"[info] src_ip extracted: {src_ip}", flush=True)
print(f"[info] Checking if {src_ip} is already blocked...", flush=True)
if src_ip not in blocked_ips: # Block the source IP based on the ratio
print(f"[ACTION] Blocking IP: {src_ip}", flush=True)
blocked_ips.add(src_ip) # Block the IP
time.sleep(0.1)
else:
print(f"[info] {src_ip} is already blocked.", flush=True)
if __name__ == "__main__":
print("[*] Starting SYN flood detection with enhanced debugging...", flush=True)
try:
# Start sniffing in a separate thread
sniff_thread = threading.Thread(target=sniff_packets, daemon=True)
sniff_thread.start()
# Run the threshold checking loop
while True:
time.sleep(CHECK_INTERVAL)
print("[info] running threshold loop...", flush=True)
check_thresholds()
except KeyboardInterrupt:
print("n[!] Stopping SYN flood detection.", flush=True)
This script is supposed to run in the router, sniffing tcp packets and monitoring the syn/syn-ack ratio to flag those attacks, then blocking the bad source ips. I have tried:
- Adding a time period for the thread that modify the variables to sleep to make sure the other thread would get updated values;
- Flushing the output in the print statements;
- Making the sniffing run in another thread not to interfere with the rest of the program;
- Adding more print statements to help me to track the code flow.
But none of that works. At first I thought the “check_thresholds” function wasn’t getting executed at all, but after seeing my output I found out that it seems more weird than that:
[*] Starting SYN flood detection with dual-flag logic…
[INFO] SYN from 192.168.1.2 to 192.168.2.2:12345
…Message keeps showing, until this message starts to show
[ALERT] High SYN rate detected for 192.168.1.2 (29 SYNs/sec)
…This message keeps showing until I terminate the program
[!] Stopping SYN flood detection.
This is a fairly simple script, but I have been failing to understand why some messages get printed and some don’t. I understand it might be a logical error, but then why some other messages that come before those don’t get printed either way?
fclorenzo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.