Why i’m only seeing certain messages that start with either “[*]”, “[Info]” or [“alert]” in my script in the output?

So, I have a simple python script which aims at detecting syn flood attacks in a virtual network environment called mininet using scapy.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>from scapy.all import *
from collections import defaultdict
import threading
import time
import sys
# Data structures
syn_to_synack = defaultdict(lambda: [0, 0]) # SYN to SYN-ACK ratio tracker
blocked_ips = set() # Blocked IPs
# Thresholds
SYN_SYNACK_RATIO_THRESHOLD = 3 # Lower ratio for easier detection during testing
CHECK_INTERVAL = 1 # Interval in seconds to check thresholds
def monitor_packets(packet):
"""
Function to process sniffed packets and update tracking data.
"""
global blocked_ips
if packet.haslayer(TCP):
tcp = packet[TCP]
ip = packet[IP]
# If the source IP is blocked, drop the packet
if ip.src in blocked_ips:
print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True)
return
# Track SYN packets
if tcp.flags == "S":
syn_to_synack[(ip.src, tcp.dport)][0] += 1 # Increment SYN count
time.sleep(0.01)
print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
# Track SYN-ACK packets
if tcp.flags == "SA":
syn_to_synack[(ip.src, tcp.sport)][1] += 1 # Increment SYN-ACK count
time.sleep(0.01) # Artificial delay for debugging
print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)
def sniff_packets():
"""
Function to run sniffing in a separate thread.
"""
print("[*] Starting packet sniffing...", flush=True)
sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1)
def check_thresholds():
"""
Function to check thresholds for SYN flood detection and block offending IPs.
"""
global blocked_ips
print(f"[info] Checking thresholds...", flush=True)
print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr)
print(f"[info] Current blocked IPs: {blocked_ips}", flush=True)
# Check SYN to SYN-ACK ratio
for key, counts in syn_to_synack.items():
syn_count, synack_count = counts
print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True)
if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD:
print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True)
src_ip = key[0] # Extract the source IP from the key
print(f"[info] src_ip extracted: {src_ip}", flush=True)
print(f"[info] Checking if {src_ip} is already blocked...", flush=True)
if src_ip not in blocked_ips: # Block the source IP based on the ratio
print(f"[ACTION] Blocking IP: {src_ip}", flush=True)
blocked_ips.add(src_ip) # Block the IP
time.sleep(0.1)
else:
print(f"[info] {src_ip} is already blocked.", flush=True)
if __name__ == "__main__":
print("[*] Starting SYN flood detection with enhanced debugging...", flush=True)
try:
# Start sniffing in a separate thread
sniff_thread = threading.Thread(target=sniff_packets, daemon=True)
sniff_thread.start()
# Run the threshold checking loop
while True:
time.sleep(CHECK_INTERVAL)
print("[info] running threshold loop...", flush=True)
check_thresholds()
except KeyboardInterrupt:
print("n[!] Stopping SYN flood detection.", flush=True)
</code>
<code>from scapy.all import * from collections import defaultdict import threading import time import sys # Data structures syn_to_synack = defaultdict(lambda: [0, 0]) # SYN to SYN-ACK ratio tracker blocked_ips = set() # Blocked IPs # Thresholds SYN_SYNACK_RATIO_THRESHOLD = 3 # Lower ratio for easier detection during testing CHECK_INTERVAL = 1 # Interval in seconds to check thresholds def monitor_packets(packet): """ Function to process sniffed packets and update tracking data. """ global blocked_ips if packet.haslayer(TCP): tcp = packet[TCP] ip = packet[IP] # If the source IP is blocked, drop the packet if ip.src in blocked_ips: print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True) return # Track SYN packets if tcp.flags == "S": syn_to_synack[(ip.src, tcp.dport)][0] += 1 # Increment SYN count time.sleep(0.01) print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True) print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr) # Track SYN-ACK packets if tcp.flags == "SA": syn_to_synack[(ip.src, tcp.sport)][1] += 1 # Increment SYN-ACK count time.sleep(0.01) # Artificial delay for debugging print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True) print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr) def sniff_packets(): """ Function to run sniffing in a separate thread. """ print("[*] Starting packet sniffing...", flush=True) sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1) def check_thresholds(): """ Function to check thresholds for SYN flood detection and block offending IPs. """ global blocked_ips print(f"[info] Checking thresholds...", flush=True) print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr) print(f"[info] Current blocked IPs: {blocked_ips}", flush=True) # Check SYN to SYN-ACK ratio for key, counts in syn_to_synack.items(): syn_count, synack_count = counts print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True) if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD: print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True) src_ip = key[0] # Extract the source IP from the key print(f"[info] src_ip extracted: {src_ip}", flush=True) print(f"[info] Checking if {src_ip} is already blocked...", flush=True) if src_ip not in blocked_ips: # Block the source IP based on the ratio print(f"[ACTION] Blocking IP: {src_ip}", flush=True) blocked_ips.add(src_ip) # Block the IP time.sleep(0.1) else: print(f"[info] {src_ip} is already blocked.", flush=True) if __name__ == "__main__": print("[*] Starting SYN flood detection with enhanced debugging...", flush=True) try: # Start sniffing in a separate thread sniff_thread = threading.Thread(target=sniff_packets, daemon=True) sniff_thread.start() # Run the threshold checking loop while True: time.sleep(CHECK_INTERVAL) print("[info] running threshold loop...", flush=True) check_thresholds() except KeyboardInterrupt: print("n[!] Stopping SYN flood detection.", flush=True) </code>
from scapy.all import *
from collections import defaultdict
import threading
import time
import sys

# Data structures
syn_to_synack = defaultdict(lambda: [0, 0])  # SYN to SYN-ACK ratio tracker
blocked_ips = set()  # Blocked IPs

# Thresholds
SYN_SYNACK_RATIO_THRESHOLD = 3  # Lower ratio for easier detection during testing
CHECK_INTERVAL = 1  # Interval in seconds to check thresholds


def monitor_packets(packet):
    """
    Function to process sniffed packets and update tracking data.
    """
    global blocked_ips

    if packet.haslayer(TCP):
        tcp = packet[TCP]
        ip = packet[IP]

        # If the source IP is blocked, drop the packet
        if ip.src in blocked_ips:
            print(f"[BLOCKED] Dropping packet from {ip.src}", flush=True)
            return

        # Track SYN packets
        if tcp.flags == "S":
            syn_to_synack[(ip.src, tcp.dport)][0] += 1  # Increment SYN count
            time.sleep(0.01)
            print(f"[info] SYN packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
            print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)

        # Track SYN-ACK packets
        if tcp.flags == "SA":
            syn_to_synack[(ip.src, tcp.sport)][1] += 1  # Increment SYN-ACK count
            time.sleep(0.01)  # Artificial delay for debugging
            print(f"[info] SYN-ACK packet tracked: {ip.src} -> {ip.dst}:{tcp.dport}", flush=True)
            print(f"[info] Updated syn_to_synack: {dict(syn_to_synack)}", file=sys.stderr)


def sniff_packets():
    """
    Function to run sniffing in a separate thread.
    """
    print("[*] Starting packet sniffing...", flush=True)
    sniff(filter="tcp", prn=monitor_packets, store=False, timeout=0.1)


def check_thresholds():
    """
    Function to check thresholds for SYN flood detection and block offending IPs.
    """
    global blocked_ips

    print(f"[info] Checking thresholds...", flush=True)
    print(f"[info] Current syn_to_synack state: {dict(syn_to_synack)}", file=sys.stderr)
    print(f"[info] Current blocked IPs: {blocked_ips}", flush=True)

    # Check SYN to SYN-ACK ratio
    for key, counts in syn_to_synack.items():
        syn_count, synack_count = counts
        print(f"[info] Evaluating key: {key}, SYN count: {syn_count}, SYN-ACK count: {synack_count}", flush=True)
        
        if synack_count == 0 or (syn_count / synack_count) > SYN_SYNACK_RATIO_THRESHOLD:
            print(f"[ALERT] High SYN/SYN-ACK ratio for {key}: {syn_count}/{synack_count}", flush=True)
            src_ip = key[0]  # Extract the source IP from the key
            print(f"[info] src_ip extracted: {src_ip}", flush=True)
            print(f"[info] Checking if {src_ip} is already blocked...", flush=True)
            if src_ip not in blocked_ips:  # Block the source IP based on the ratio
                print(f"[ACTION] Blocking IP: {src_ip}", flush=True)
                blocked_ips.add(src_ip)  # Block the IP
                time.sleep(0.1)
            else:
                print(f"[info] {src_ip} is already blocked.", flush=True)


if __name__ == "__main__":
    print("[*] Starting SYN flood detection with enhanced debugging...", flush=True)
    try:
        # Start sniffing in a separate thread
        sniff_thread = threading.Thread(target=sniff_packets, daemon=True)
        sniff_thread.start()

        # Run the threshold checking loop
        while True:
            time.sleep(CHECK_INTERVAL)
            print("[info] running threshold loop...", flush=True)
            check_thresholds()
    except KeyboardInterrupt:
        print("n[!] Stopping SYN flood detection.", flush=True)

This script is supposed to run in the router, sniffing tcp packets and monitoring the syn/syn-ack ratio to flag those attacks, then blocking the bad source ips. I have tried:

  • Adding a time period for the thread that modify the variables to sleep to make sure the other thread would get updated values;
  • Flushing the output in the print statements;
  • Making the sniffing run in another thread not to interfere with the rest of the program;
  • Adding more print statements to help me to track the code flow.
    But none of that works. At first I thought the “check_thresholds” function wasn’t getting executed at all, but after seeing my output I found out that it seems more weird than that:

[*] Starting SYN flood detection with dual-flag logic…
[INFO] SYN from 192.168.1.2 to 192.168.2.2:12345
…Message keeps showing, until this message starts to show
[ALERT] High SYN rate detected for 192.168.1.2 (29 SYNs/sec)
…This message keeps showing until I terminate the program
[!] Stopping SYN flood detection.
This is a fairly simple script, but I have been failing to understand why some messages get printed and some don’t. I understand it might be a logical error, but then why some other messages that come before those don’t get printed either way?

New contributor

fclorenzo is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật