my application requires me to detect multicast streams and react to them. Defining the sockets and igmp is fine but once the stream(s) ends my program is still reacting to packets it “thinks?” sees even though all streams have finished.
I thought maybe the IDE output was getting bogged down but minimum logging and running code from CLI has the same problem.
Ive written this code a few different ways now and THIS issue of having some “queue” of packets having to be processed is messing things up.
Did I miss somthing?
This iteration I’m using the select libray in Python (3.9) to detect if the sockets have data like this
readable, _, _ = select.select(sockets, [], [], 0)
The output never seems to go back to zero i.e. no more active data on sockets … or am I missing the point on how to use selection to detect this traffic.
===============================
import socket
import struct
import time
import select
multicast_addresses = {'224.0.1.10': [50002, 50003,50010,50024]}
sockets = []
active_ports=[]
active_MC ={
"50002":[False],"50003":[False],
"50010":[False],"50024":[False]
}
for multicast_address, ports in multicast_addresses.items():
for port in ports:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.bind((multicast_address, port))
group = socket.inet_aton(multicast_address)
mreq = struct.pack('4sL', group, socket.INADDR_ANY)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
#sock.setblocking(False)
sockets.append(sock)
#print (sockets)
while True:
# List of sockets to monitor for read events
readable, _, _ = select.select(sockets, [], [], 0) # Timeout set to 1 second
print("just after readable", readable)
# If any of the multicast sockets are in the readable list, it means there's incoming data
#print(f'the lenght of readable : {len(readable)} {readable}')
#print(readable,end='/r')
#print (f"streams dected = {len(readable)}")
#print(active_MC)
port_check = '50003'
length = len(readable)
pass
for sock in readable: # Create List of active ports
port = sock.getsockname()[1] # get the name of one of the ports
active_ports.append(port)
print(f"active_ports array = {active_ports} type={type(active_ports)} {type(active_ports[0])}")
#note active_ports get stored as integers
#print("After For loop acive Ports:",active_ports)
#for port_check in active_MC.keys(): # loop through all ports defined
#print ("Ports For loop read ",port_check,type(port_check))
#print(f"Array Port_check = {port_check} ")
# port_check is a str
if int(port_check) in active_ports : # stream is active if true
#print(f"Vars in first test port_check= {port_check} active_MC = {active_MC[str(port_check)][0]}")
if active_MC[port_check][0] == False :# First time detected
print(f"stream started on port: {port_check}nn")
active_MC[port_check][0] = True
print("MAIN: ",active_MC)
else:
# True Events may be ignored or Not
# thinking I may need to time duration of stream
#print("Failed First Test")
pass
else:
print("main if statement was falsen",port_check,active_MC[port_check][0])
# here Port is NOT an active stream so set to false and log
if active_MC[port_check][0] == True :# Stream was live but no longer
print(f"stream STOPPED on port: {port_check}")
active_MC[port_check][0] = False
print("POST: ",active_MC)
else:
print("POST if statement was falsen")
# True Events may be ignored or Not
# thinking I may need to time duration of stream
pass
active_ports.clear() # clear the array for the next text
time.sleep(1)
I expect that if there is no data being received by the socket then the select statement will not return that socket to the list of active sockets that are recieving streams.
I have a seperate box that generates the MC streams on the correct port to generate the events
In a previous generation of this code I setup 10 different threads each listening to one unique socket. I pretty sure that caused a memory leak cause I had crashes every 38 days from a boot (almost exactly) once that was implemented. I figured a more efficient way is required.