Cannot close correctly aiortc RTCPeerConnection after client disconnect

offer:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>
async def offer(self, request):
try:
params = await request.json()
peer_connection = {
"name": params["name"],
"surname": params["surname"],
"pc": None,
"is_closed": False,
"dc": None,
"uid": uuid.uuid4(),
"audio_track": None,
"audio_track_for_local_use": None,
"audio_blackhole": None,
"video_track": None,
"video_track_for_local_use": None,
"video_blackhole": None,
"offer_in_progress": True,
"call_answered": False,
"manage_call_end_thread": None,
"stop_in_progress": False,
"call_number": None
}
if len(list(self.pcs.keys())) == 3:
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
reserved_call_numbers = list(self.pcs.keys())
if 1 not in reserved_call_numbers:
call_number = 1
elif 2 not in reserved_call_numbers:
call_number = 2
else:
call_number = 3
peer_connection["call_number"] = call_number
self.pcs[call_number] = peer_connection
self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})
timer = 0
while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
if request.transport is None or request.transport.is_closing():
self.to_emitter.send({"type":"transport-error","call-number":call_number})
try:
request.transport.close()
except:
pass
del self.pcs[call_number]
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
timer += 0.1
await asyncio.sleep(0.1)
self.pcs[call_number]["call_answered"] = True
if self.queue.qsize() == 0:
return self.reject_offer(call_number,peer_connection)
else:
data = self.queue.get()
correct_types = ["call-1","call-2","call-3"]
if data["type"] in correct_types:
if (data["call"] == "reject"):
return self.reject_offer(call_number, peer_connection)
elif (data["call"] == "answer"):
while not self.queue.empty():
self.queue.get()
self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))
@self.pcs[call_number]["pc"].on("iceconnectionstatechange")
async def on_ice_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"ICE connection state: {pc.iceConnectionState}")
if pc.iceConnectionState == "failed":
print("ICE connection failed. Attempting to restart ICE.")
await pc.restartIce()
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
if pc.iceConnectionState == "disconnected":
# Wait before handling disconnection
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("connectionstatechange")
async def on_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"Connection state: {pc.connectionState}")
if pc.connectionState in ["failed", "disconnected", "closed"]:
print("Connection failed, disconnected, or closed. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("datachannel")
async def on_datachannel(channel):
self.pcs[call_number]["dc"] = channel
# Send UID to the connecting peer
try:
channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
except Exception as e:
print(f"Error sending UID: {e}")
# Inform about other peers
try:
for uid, pc in self.pcs.items():
if pc['uid'] != peer_connection['uid']:
try:
pc['dc'].send(json.dumps({
"type": "other-uid",
"uid": str(peer_connection['uid']),
"name": peer_connection["name"],
"surname": peer_connection["surname"]
}))
except Exception as e:
print(f"Error sending other-uid message: {e}")
except Exception as e:
print(f"Error informing about other peers: {e}")
@channel.on("message")
async def on_message(message):
try:
message = json.loads(message)
msg_type = message.get("type")
if msg_type == "disconnected":
print('disconnected message from client')
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
elif msg_type in ["offer", "answer", "ice-candidate"]:
target_uid = message["to_uid"]
for uid, pc in self.pcs.items():
if str(pc['uid']) == target_uid:
try:
pc["dc"].send(json.dumps(message))
except Exception as e:
print(f"Error relaying {msg_type}: {e}")
else:
print(f"Unhandled message type: {msg_type}")
except Exception as e:
print(f"Error handling message: {e}")
@channel.on("close")
async def on_close():
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
# audio from server to client
if self.server_audio_stream_offer == None:
self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
#self.server_audio_stream_offer = AudioStreamTrack()
self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)
# video from server to client
if self.server_video_stream_offer is None:
self.server_video_stream_offer = self.create_local_tracks()
self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)
# Attach video from server to QLabel
if self.server_video_track is None:
self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
if self.server_video_blackholde is None:
self.server_video_blackholde = MediaBlackhole()
self.server_video_blackholde.addTrack(self.server_video_track)
await self.server_video_blackholde.start()
@self.pcs[call_number]["pc"].on("track")
async def on_track(track):
if track.kind == "audio":
self.pcs[call_number]["audio_track"] = track
# audio from client (server use)
if call_number == 1:
correct_queue = self.ip_call_1_packet_queue
elif call_number == 2:
correct_queue = self.ip_call_2_packet_queue
else:
correct_queue = self.ip_call_3_packet_queue
self.put_to_q = True
self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
self.pcs[call_number]["audio_blackhole"].addTrack(
self.pcs[call_number]["audio_track_for_local_use"])
await self.pcs[call_number]["audio_blackhole"].start()
else:
self.pcs[call_number]["video_track"] = track
# video from client (server use)
self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
call_number, self)
self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
self.pcs[call_number]["video_blackhole"].addTrack(
self.pcs[call_number]["video_track_for_local_use"])
await self.pcs[call_number]["video_blackhole"].start()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
# handle offer
await self.pcs[call_number]["pc"].setRemoteDescription(offer)
# send answer
answer = await self.pcs[call_number]["pc"].createAnswer()
await self.pcs[call_number]["pc"].setLocalDescription(answer)
return web.Response(content_type="application/json", text=json.dumps(
{"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
"type": self.pcs[call_number]["pc"].localDescription.type}))
else:
return self.reject_offer(call_number, peer_connection)
else:
return self.reject_offer(call_number, peer_connection)
except:
error = traceback.format_exc()
self.to_emitter.send({"type": "error", "error_message": error})
</code>
<code> async def offer(self, request): try: params = await request.json() peer_connection = { "name": params["name"], "surname": params["surname"], "pc": None, "is_closed": False, "dc": None, "uid": uuid.uuid4(), "audio_track": None, "audio_track_for_local_use": None, "audio_blackhole": None, "video_track": None, "video_track_for_local_use": None, "video_blackhole": None, "offer_in_progress": True, "call_answered": False, "manage_call_end_thread": None, "stop_in_progress": False, "call_number": None } if len(list(self.pcs.keys())) == 3: return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""})) reserved_call_numbers = list(self.pcs.keys()) if 1 not in reserved_call_numbers: call_number = 1 elif 2 not in reserved_call_numbers: call_number = 2 else: call_number = 3 peer_connection["call_number"] = call_number self.pcs[call_number] = peer_connection self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']}) timer = 0 while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False): if request.transport is None or request.transport.is_closing(): self.to_emitter.send({"type":"transport-error","call-number":call_number}) try: request.transport.close() except: pass del self.pcs[call_number] return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""})) timer += 0.1 await asyncio.sleep(0.1) self.pcs[call_number]["call_answered"] = True if self.queue.qsize() == 0: return self.reject_offer(call_number,peer_connection) else: data = self.queue.get() correct_types = ["call-1","call-2","call-3"] if data["type"] in correct_types: if (data["call"] == "reject"): return self.reject_offer(call_number, peer_connection) elif (data["call"] == "answer"): while not self.queue.empty(): self.queue.get() self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),])) @self.pcs[call_number]["pc"].on("iceconnectionstatechange") async def on_ice_connection_state_change(): pc = self.pcs[call_number]["pc"] print(f"ICE connection state: {pc.iceConnectionState}") if pc.iceConnectionState == "failed": print("ICE connection failed. Attempting to restart ICE.") await pc.restartIce() await asyncio.sleep(5) # Adjust as needed if pc.iceConnectionState == "disconnected": print("Peer connection lost. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) if pc.iceConnectionState == "disconnected": # Wait before handling disconnection await asyncio.sleep(5) # Adjust as needed if pc.iceConnectionState == "disconnected": print("Peer connection lost. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) @self.pcs[call_number]["pc"].on("connectionstatechange") async def on_connection_state_change(): pc = self.pcs[call_number]["pc"] print(f"Connection state: {pc.connectionState}") if pc.connectionState in ["failed", "disconnected", "closed"]: print("Connection failed, disconnected, or closed. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) @self.pcs[call_number]["pc"].on("datachannel") async def on_datachannel(channel): self.pcs[call_number]["dc"] = channel # Send UID to the connecting peer try: channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])})) except Exception as e: print(f"Error sending UID: {e}") # Inform about other peers try: for uid, pc in self.pcs.items(): if pc['uid'] != peer_connection['uid']: try: pc['dc'].send(json.dumps({ "type": "other-uid", "uid": str(peer_connection['uid']), "name": peer_connection["name"], "surname": peer_connection["surname"] })) except Exception as e: print(f"Error sending other-uid message: {e}") except Exception as e: print(f"Error informing about other peers: {e}") @channel.on("message") async def on_message(message): try: message = json.loads(message) msg_type = message.get("type") if msg_type == "disconnected": print('disconnected message from client') await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) elif msg_type in ["offer", "answer", "ice-candidate"]: target_uid = message["to_uid"] for uid, pc in self.pcs.items(): if str(pc['uid']) == target_uid: try: pc["dc"].send(json.dumps(message)) except Exception as e: print(f"Error relaying {msg_type}: {e}") else: print(f"Unhandled message type: {msg_type}") except Exception as e: print(f"Error handling message: {e}") @channel.on("close") async def on_close(): await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) # audio from server to client if self.server_audio_stream_offer == None: self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter) #self.server_audio_stream_offer = AudioStreamTrack() self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer) # video from server to client if self.server_video_stream_offer is None: self.server_video_stream_offer = self.create_local_tracks() self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer) # Attach video from server to QLabel if self.server_video_track is None: self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter) if self.server_video_blackholde is None: self.server_video_blackholde = MediaBlackhole() self.server_video_blackholde.addTrack(self.server_video_track) await self.server_video_blackholde.start() @self.pcs[call_number]["pc"].on("track") async def on_track(track): if track.kind == "audio": self.pcs[call_number]["audio_track"] = track # audio from client (server use) if call_number == 1: correct_queue = self.ip_call_1_packet_queue elif call_number == 2: correct_queue = self.ip_call_2_packet_queue else: correct_queue = self.ip_call_3_packet_queue self.put_to_q = True self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue) self.pcs[call_number]["audio_blackhole"] = MediaBlackhole() self.pcs[call_number]["audio_blackhole"].addTrack( self.pcs[call_number]["audio_track_for_local_use"]) await self.pcs[call_number]["audio_blackhole"].start() else: self.pcs[call_number]["video_track"] = track # video from client (server use) self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter, call_number, self) self.pcs[call_number]["video_blackhole"] = MediaBlackhole() self.pcs[call_number]["video_blackhole"].addTrack( self.pcs[call_number]["video_track_for_local_use"]) await self.pcs[call_number]["video_blackhole"].start() offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) # handle offer await self.pcs[call_number]["pc"].setRemoteDescription(offer) # send answer answer = await self.pcs[call_number]["pc"].createAnswer() await self.pcs[call_number]["pc"].setLocalDescription(answer) return web.Response(content_type="application/json", text=json.dumps( {"sdp": self.pcs[call_number]["pc"].localDescription.sdp, "type": self.pcs[call_number]["pc"].localDescription.type})) else: return self.reject_offer(call_number, peer_connection) else: return self.reject_offer(call_number, peer_connection) except: error = traceback.format_exc() self.to_emitter.send({"type": "error", "error_message": error}) </code>

    async def offer(self, request):
        try:
            params = await request.json()

            peer_connection = {
                "name": params["name"],
                "surname": params["surname"],
                "pc": None,
                "is_closed": False,
                "dc": None,
                "uid": uuid.uuid4(),
                "audio_track": None,
                "audio_track_for_local_use": None,
                "audio_blackhole": None,
                "video_track": None,
                "video_track_for_local_use": None,
                "video_blackhole": None,
                "offer_in_progress": True,
                "call_answered": False,
                "manage_call_end_thread": None,
                "stop_in_progress": False,
                "call_number": None
            }

            if len(list(self.pcs.keys())) == 3:
                return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))

            reserved_call_numbers = list(self.pcs.keys())

            if 1 not in reserved_call_numbers:
                call_number = 1
            elif 2 not in reserved_call_numbers:
                call_number = 2
            else:
                call_number = 3

            peer_connection["call_number"] = call_number
            self.pcs[call_number] = peer_connection

            self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})

            timer = 0
            while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
                if request.transport is None or request.transport.is_closing():
                    self.to_emitter.send({"type":"transport-error","call-number":call_number})
                    try:
                        request.transport.close()
                    except:
                        pass
                    del self.pcs[call_number]
                    return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
                timer += 0.1
                await asyncio.sleep(0.1)

            self.pcs[call_number]["call_answered"] = True
            if self.queue.qsize() == 0:
                return self.reject_offer(call_number,peer_connection)
            else:
                data = self.queue.get()
                correct_types = ["call-1","call-2","call-3"]
                if data["type"] in correct_types:
                    if (data["call"] == "reject"):
                        return self.reject_offer(call_number, peer_connection)
                    elif (data["call"] == "answer"):
                        while not self.queue.empty():
                            self.queue.get()

                        self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))

                        @self.pcs[call_number]["pc"].on("iceconnectionstatechange")
                        async def on_ice_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"ICE connection state: {pc.iceConnectionState}")

                            if pc.iceConnectionState == "failed":
                                print("ICE connection failed. Attempting to restart ICE.")
                                await pc.restartIce()
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])


                            if pc.iceConnectionState == "disconnected":
                                # Wait before handling disconnection
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("connectionstatechange")
                        async def on_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"Connection state: {pc.connectionState}")
                            if pc.connectionState in ["failed", "disconnected", "closed"]:
                                print("Connection failed, disconnected, or closed. Stopping connection.")
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("datachannel")
                        async def on_datachannel(channel):
                            self.pcs[call_number]["dc"] = channel

                            # Send UID to the connecting peer
                            try:
                                channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
                            except Exception as e:
                                print(f"Error sending UID: {e}")

                            # Inform about other peers
                            try:
                                for uid, pc in self.pcs.items():
                                    if pc['uid'] != peer_connection['uid']:
                                        try:
                                            pc['dc'].send(json.dumps({
                                                "type": "other-uid",
                                                "uid": str(peer_connection['uid']),
                                                "name": peer_connection["name"],
                                                "surname": peer_connection["surname"]
                                            }))
                                        except Exception as e:
                                            print(f"Error sending other-uid message: {e}")
                            except Exception as e:
                                print(f"Error informing about other peers: {e}")

                            @channel.on("message")
                            async def on_message(message):
                                try:
                                    message = json.loads(message)
                                    msg_type = message.get("type")
                                    if msg_type == "disconnected":
                                        print('disconnected message from client')
                                        await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
                                    elif msg_type in ["offer", "answer", "ice-candidate"]:
                                        target_uid = message["to_uid"]
                                        for uid, pc in self.pcs.items():
                                            if str(pc['uid']) == target_uid:
                                                try:
                                                    pc["dc"].send(json.dumps(message))
                                                except Exception as e:
                                                    print(f"Error relaying {msg_type}: {e}")
                                    else:
                                        print(f"Unhandled message type: {msg_type}")
                                except Exception as e:
                                    print(f"Error handling message: {e}")

                            @channel.on("close")
                            async def on_close():
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        # audio from server to client
                        if self.server_audio_stream_offer == None:
                            self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
                            #self.server_audio_stream_offer = AudioStreamTrack()

                        self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)

                        # video from server to client
                        if self.server_video_stream_offer is None:
                            self.server_video_stream_offer = self.create_local_tracks()
                        self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)

                        # Attach video from server to QLabel
                        if self.server_video_track is None:
                            self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
                        if self.server_video_blackholde is None:
                            self.server_video_blackholde = MediaBlackhole()
                            self.server_video_blackholde.addTrack(self.server_video_track)
                            await self.server_video_blackholde.start()

                        @self.pcs[call_number]["pc"].on("track")
                        async def on_track(track):
                            if track.kind == "audio":
                                self.pcs[call_number]["audio_track"] = track
                                # audio from client (server use)
                                if call_number == 1:
                                    correct_queue = self.ip_call_1_packet_queue
                                elif call_number == 2:
                                    correct_queue = self.ip_call_2_packet_queue
                                else:
                                    correct_queue = self.ip_call_3_packet_queue
                                self.put_to_q = True
                                self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
                                self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["audio_blackhole"].addTrack(
                                    self.pcs[call_number]["audio_track_for_local_use"])
                                await self.pcs[call_number]["audio_blackhole"].start()
                            else:
                                self.pcs[call_number]["video_track"] = track
                                # video from client (server use)
                                self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
                                                                                                     call_number, self)
                                self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["video_blackhole"].addTrack(
                                    self.pcs[call_number]["video_track_for_local_use"])
                                await self.pcs[call_number]["video_blackhole"].start()

                        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

                        # handle offer
                        await self.pcs[call_number]["pc"].setRemoteDescription(offer)

                        # send answer
                        answer = await self.pcs[call_number]["pc"].createAnswer()
                        await self.pcs[call_number]["pc"].setLocalDescription(answer)

                        return web.Response(content_type="application/json", text=json.dumps(
                            {"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
                             "type": self.pcs[call_number]["pc"].localDescription.type}))
                    else:
                        return self.reject_offer(call_number, peer_connection)
                else:
                    return self.reject_offer(call_number, peer_connection)
        except:
            error = traceback.format_exc()
            self.to_emitter.send({"type": "error", "error_message": error})

stop_peer_connection:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code> async def stop_peer_connection(self, call_number, uid):
try:
if call_number not in self.pcs:
# Peer connection does not exist
return None
if self.pcs[call_number]["stop_in_progress"]:
# Stop process is already in progress
return None
self.pcs[call_number]["stop_in_progress"] = True
# 1. Notify PyQt5 about the stop
self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
# 2. Empty the correct queue
queue = self.call_queues[call_number - 1]
while not queue.empty():
queue.get()
# 4. Close data channel
try:
self.pcs[call_number]["dc"].close()
except Exception:
pass
# 5. Stop client audio track
try:
await self.pcs[call_number]["audio_blackhole"].stop()
except Exception:
pass
# 6. Notify PyQt5 about call status
self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
# 7. Stop client video track
try:
self.pcs[call_number]["video_track_for_local_use"].stop()
await self.pcs[call_number]["video_blackhole"].stop()
except Exception:
pass
# 9. Release server resources if there are no remaining connections
if len(self.pcs.keys()) == 1: # No active peer connections
# Stop video relay and related blackholes
try:
if self.server_video_blackholde:
await self.server_video_blackholde.stop()
self.server_video_blackholde = None
if self.server_video_track:
await self.server_video_track.stop()
self.server_video_track = None
except Exception:
pass
# Stop server audio stream
try:
if self.server_audio_stream_offer:
self.server_audio_stream_offer.stop()
self.server_audio_stream_offer = None
except Exception:
pass
# Stop and release the webcam
try:
if self.webcam:
if self.webcam.video:
self.webcam.video.stop()
await asyncio.sleep(0.1) # Give time for cleanup
self.webcam = None
except Exception as e:
self.to_emitter.send({"type": "error", "error_message": str(e)})
# Attempt final resource cleanup
gc.collect() # Force garbage collection to release resources
self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})
# 3. Close peer connection
try:
await self.pcs[call_number]["pc"].close()
print('Peer connection closed correctly!!!')
except Exception:
print(traceback.format_exc())
# 8. Cleanup
try:
del self.pcs[call_number]
except Exception:
pass
except Exception:
error = traceback.format_exc()
print(error)
self.to_emitter.send({"type": "error", "error_message": error})
</code>
<code> async def stop_peer_connection(self, call_number, uid): try: if call_number not in self.pcs: # Peer connection does not exist return None if self.pcs[call_number]["stop_in_progress"]: # Stop process is already in progress return None self.pcs[call_number]["stop_in_progress"] = True # 1. Notify PyQt5 about the stop self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number}) # 2. Empty the correct queue queue = self.call_queues[call_number - 1] while not queue.empty(): queue.get() # 4. Close data channel try: self.pcs[call_number]["dc"].close() except Exception: pass # 5. Stop client audio track try: await self.pcs[call_number]["audio_blackhole"].stop() except Exception: pass # 6. Notify PyQt5 about call status self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"}) # 7. Stop client video track try: self.pcs[call_number]["video_track_for_local_use"].stop() await self.pcs[call_number]["video_blackhole"].stop() except Exception: pass # 9. Release server resources if there are no remaining connections if len(self.pcs.keys()) == 1: # No active peer connections # Stop video relay and related blackholes try: if self.server_video_blackholde: await self.server_video_blackholde.stop() self.server_video_blackholde = None if self.server_video_track: await self.server_video_track.stop() self.server_video_track = None except Exception: pass # Stop server audio stream try: if self.server_audio_stream_offer: self.server_audio_stream_offer.stop() self.server_audio_stream_offer = None except Exception: pass # Stop and release the webcam try: if self.webcam: if self.webcam.video: self.webcam.video.stop() await asyncio.sleep(0.1) # Give time for cleanup self.webcam = None except Exception as e: self.to_emitter.send({"type": "error", "error_message": str(e)}) # Attempt final resource cleanup gc.collect() # Force garbage collection to release resources self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."}) # 3. Close peer connection try: await self.pcs[call_number]["pc"].close() print('Peer connection closed correctly!!!') except Exception: print(traceback.format_exc()) # 8. Cleanup try: del self.pcs[call_number] except Exception: pass except Exception: error = traceback.format_exc() print(error) self.to_emitter.send({"type": "error", "error_message": error}) </code>
    async def stop_peer_connection(self, call_number, uid):
        try:
            if call_number not in self.pcs:
                # Peer connection does not exist
                return None
            if self.pcs[call_number]["stop_in_progress"]:
                # Stop process is already in progress
                return None
            self.pcs[call_number]["stop_in_progress"] = True
            # 1. Notify PyQt5 about the stop
            self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
            # 2. Empty the correct queue
            queue = self.call_queues[call_number - 1]
            while not queue.empty():
                queue.get()
            # 4. Close data channel
            try:
                self.pcs[call_number]["dc"].close()
            except Exception:
                pass
            # 5. Stop client audio track
            try:
                await self.pcs[call_number]["audio_blackhole"].stop()
            except Exception:
                pass
            # 6. Notify PyQt5 about call status
            self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
            # 7. Stop client video track
            try:
                self.pcs[call_number]["video_track_for_local_use"].stop()
                await self.pcs[call_number]["video_blackhole"].stop()
            except Exception:
                pass
            # 9. Release server resources if there are no remaining connections
            if len(self.pcs.keys()) == 1:  # No active peer connections
                # Stop video relay and related blackholes
                try:
                    if self.server_video_blackholde:
                        await self.server_video_blackholde.stop()
                        self.server_video_blackholde = None
                    if self.server_video_track:
                        await self.server_video_track.stop()
                        self.server_video_track = None
                except Exception:
                    pass

                # Stop server audio stream
                try:
                    if self.server_audio_stream_offer:
                        self.server_audio_stream_offer.stop()
                        self.server_audio_stream_offer = None
                except Exception:
                    pass

                # Stop and release the webcam
                try:
                    if self.webcam:
                        if self.webcam.video:
                            self.webcam.video.stop()
                        await asyncio.sleep(0.1)  # Give time for cleanup
                        self.webcam = None
                except Exception as e:
                    self.to_emitter.send({"type": "error", "error_message": str(e)})

                # Attempt final resource cleanup
                gc.collect()  # Force garbage collection to release resources
                self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})

            # 3. Close peer connection
            try:
                await self.pcs[call_number]["pc"].close()
                print('Peer connection closed correctly!!!')
            except Exception:
                print(traceback.format_exc())
            # 8. Cleanup
            try:
                del self.pcs[call_number]
            except Exception:
                pass
        except Exception:
            error = traceback.format_exc()
            print(error)
            self.to_emitter.send({"type": "error", "error_message": error})

The problem with this code is that when client is disconnected, so it’s send disconnect message from dc, then stop_peer_connection async method is runned, then this message:

print('Peer connection closed correctly!!!')

never printed because await self.pcs[call_number]["pc"].close() never returns

1

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa

Cannot close correctly aiortc RTCPeerConnection after client disconnect

offer:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>
async def offer(self, request):
try:
params = await request.json()
peer_connection = {
"name": params["name"],
"surname": params["surname"],
"pc": None,
"is_closed": False,
"dc": None,
"uid": uuid.uuid4(),
"audio_track": None,
"audio_track_for_local_use": None,
"audio_blackhole": None,
"video_track": None,
"video_track_for_local_use": None,
"video_blackhole": None,
"offer_in_progress": True,
"call_answered": False,
"manage_call_end_thread": None,
"stop_in_progress": False,
"call_number": None
}
if len(list(self.pcs.keys())) == 3:
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
reserved_call_numbers = list(self.pcs.keys())
if 1 not in reserved_call_numbers:
call_number = 1
elif 2 not in reserved_call_numbers:
call_number = 2
else:
call_number = 3
peer_connection["call_number"] = call_number
self.pcs[call_number] = peer_connection
self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})
timer = 0
while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
if request.transport is None or request.transport.is_closing():
self.to_emitter.send({"type":"transport-error","call-number":call_number})
try:
request.transport.close()
except:
pass
del self.pcs[call_number]
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
timer += 0.1
await asyncio.sleep(0.1)
self.pcs[call_number]["call_answered"] = True
if self.queue.qsize() == 0:
return self.reject_offer(call_number,peer_connection)
else:
data = self.queue.get()
correct_types = ["call-1","call-2","call-3"]
if data["type"] in correct_types:
if (data["call"] == "reject"):
return self.reject_offer(call_number, peer_connection)
elif (data["call"] == "answer"):
while not self.queue.empty():
self.queue.get()
self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))
@self.pcs[call_number]["pc"].on("iceconnectionstatechange")
async def on_ice_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"ICE connection state: {pc.iceConnectionState}")
if pc.iceConnectionState == "failed":
print("ICE connection failed. Attempting to restart ICE.")
await pc.restartIce()
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
if pc.iceConnectionState == "disconnected":
# Wait before handling disconnection
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("connectionstatechange")
async def on_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"Connection state: {pc.connectionState}")
if pc.connectionState in ["failed", "disconnected", "closed"]:
print("Connection failed, disconnected, or closed. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("datachannel")
async def on_datachannel(channel):
self.pcs[call_number]["dc"] = channel
# Send UID to the connecting peer
try:
channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
except Exception as e:
print(f"Error sending UID: {e}")
# Inform about other peers
try:
for uid, pc in self.pcs.items():
if pc['uid'] != peer_connection['uid']:
try:
pc['dc'].send(json.dumps({
"type": "other-uid",
"uid": str(peer_connection['uid']),
"name": peer_connection["name"],
"surname": peer_connection["surname"]
}))
except Exception as e:
print(f"Error sending other-uid message: {e}")
except Exception as e:
print(f"Error informing about other peers: {e}")
@channel.on("message")
async def on_message(message):
try:
message = json.loads(message)
msg_type = message.get("type")
if msg_type == "disconnected":
print('disconnected message from client')
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
elif msg_type in ["offer", "answer", "ice-candidate"]:
target_uid = message["to_uid"]
for uid, pc in self.pcs.items():
if str(pc['uid']) == target_uid:
try:
pc["dc"].send(json.dumps(message))
except Exception as e:
print(f"Error relaying {msg_type}: {e}")
else:
print(f"Unhandled message type: {msg_type}")
except Exception as e:
print(f"Error handling message: {e}")
@channel.on("close")
async def on_close():
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
# audio from server to client
if self.server_audio_stream_offer == None:
self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
#self.server_audio_stream_offer = AudioStreamTrack()
self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)
# video from server to client
if self.server_video_stream_offer is None:
self.server_video_stream_offer = self.create_local_tracks()
self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)
# Attach video from server to QLabel
if self.server_video_track is None:
self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
if self.server_video_blackholde is None:
self.server_video_blackholde = MediaBlackhole()
self.server_video_blackholde.addTrack(self.server_video_track)
await self.server_video_blackholde.start()
@self.pcs[call_number]["pc"].on("track")
async def on_track(track):
if track.kind == "audio":
self.pcs[call_number]["audio_track"] = track
# audio from client (server use)
if call_number == 1:
correct_queue = self.ip_call_1_packet_queue
elif call_number == 2:
correct_queue = self.ip_call_2_packet_queue
else:
correct_queue = self.ip_call_3_packet_queue
self.put_to_q = True
self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
self.pcs[call_number]["audio_blackhole"].addTrack(
self.pcs[call_number]["audio_track_for_local_use"])
await self.pcs[call_number]["audio_blackhole"].start()
else:
self.pcs[call_number]["video_track"] = track
# video from client (server use)
self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
call_number, self)
self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
self.pcs[call_number]["video_blackhole"].addTrack(
self.pcs[call_number]["video_track_for_local_use"])
await self.pcs[call_number]["video_blackhole"].start()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
# handle offer
await self.pcs[call_number]["pc"].setRemoteDescription(offer)
# send answer
answer = await self.pcs[call_number]["pc"].createAnswer()
await self.pcs[call_number]["pc"].setLocalDescription(answer)
return web.Response(content_type="application/json", text=json.dumps(
{"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
"type": self.pcs[call_number]["pc"].localDescription.type}))
else:
return self.reject_offer(call_number, peer_connection)
else:
return self.reject_offer(call_number, peer_connection)
except:
error = traceback.format_exc()
self.to_emitter.send({"type": "error", "error_message": error})
</code>
<code> async def offer(self, request): try: params = await request.json() peer_connection = { "name": params["name"], "surname": params["surname"], "pc": None, "is_closed": False, "dc": None, "uid": uuid.uuid4(), "audio_track": None, "audio_track_for_local_use": None, "audio_blackhole": None, "video_track": None, "video_track_for_local_use": None, "video_blackhole": None, "offer_in_progress": True, "call_answered": False, "manage_call_end_thread": None, "stop_in_progress": False, "call_number": None } if len(list(self.pcs.keys())) == 3: return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""})) reserved_call_numbers = list(self.pcs.keys()) if 1 not in reserved_call_numbers: call_number = 1 elif 2 not in reserved_call_numbers: call_number = 2 else: call_number = 3 peer_connection["call_number"] = call_number self.pcs[call_number] = peer_connection self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']}) timer = 0 while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False): if request.transport is None or request.transport.is_closing(): self.to_emitter.send({"type":"transport-error","call-number":call_number}) try: request.transport.close() except: pass del self.pcs[call_number] return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""})) timer += 0.1 await asyncio.sleep(0.1) self.pcs[call_number]["call_answered"] = True if self.queue.qsize() == 0: return self.reject_offer(call_number,peer_connection) else: data = self.queue.get() correct_types = ["call-1","call-2","call-3"] if data["type"] in correct_types: if (data["call"] == "reject"): return self.reject_offer(call_number, peer_connection) elif (data["call"] == "answer"): while not self.queue.empty(): self.queue.get() self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),])) @self.pcs[call_number]["pc"].on("iceconnectionstatechange") async def on_ice_connection_state_change(): pc = self.pcs[call_number]["pc"] print(f"ICE connection state: {pc.iceConnectionState}") if pc.iceConnectionState == "failed": print("ICE connection failed. Attempting to restart ICE.") await pc.restartIce() await asyncio.sleep(5) # Adjust as needed if pc.iceConnectionState == "disconnected": print("Peer connection lost. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) if pc.iceConnectionState == "disconnected": # Wait before handling disconnection await asyncio.sleep(5) # Adjust as needed if pc.iceConnectionState == "disconnected": print("Peer connection lost. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) @self.pcs[call_number]["pc"].on("connectionstatechange") async def on_connection_state_change(): pc = self.pcs[call_number]["pc"] print(f"Connection state: {pc.connectionState}") if pc.connectionState in ["failed", "disconnected", "closed"]: print("Connection failed, disconnected, or closed. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) @self.pcs[call_number]["pc"].on("datachannel") async def on_datachannel(channel): self.pcs[call_number]["dc"] = channel # Send UID to the connecting peer try: channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])})) except Exception as e: print(f"Error sending UID: {e}") # Inform about other peers try: for uid, pc in self.pcs.items(): if pc['uid'] != peer_connection['uid']: try: pc['dc'].send(json.dumps({ "type": "other-uid", "uid": str(peer_connection['uid']), "name": peer_connection["name"], "surname": peer_connection["surname"] })) except Exception as e: print(f"Error sending other-uid message: {e}") except Exception as e: print(f"Error informing about other peers: {e}") @channel.on("message") async def on_message(message): try: message = json.loads(message) msg_type = message.get("type") if msg_type == "disconnected": print('disconnected message from client') await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) elif msg_type in ["offer", "answer", "ice-candidate"]: target_uid = message["to_uid"] for uid, pc in self.pcs.items(): if str(pc['uid']) == target_uid: try: pc["dc"].send(json.dumps(message)) except Exception as e: print(f"Error relaying {msg_type}: {e}") else: print(f"Unhandled message type: {msg_type}") except Exception as e: print(f"Error handling message: {e}") @channel.on("close") async def on_close(): await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) # audio from server to client if self.server_audio_stream_offer == None: self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter) #self.server_audio_stream_offer = AudioStreamTrack() self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer) # video from server to client if self.server_video_stream_offer is None: self.server_video_stream_offer = self.create_local_tracks() self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer) # Attach video from server to QLabel if self.server_video_track is None: self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter) if self.server_video_blackholde is None: self.server_video_blackholde = MediaBlackhole() self.server_video_blackholde.addTrack(self.server_video_track) await self.server_video_blackholde.start() @self.pcs[call_number]["pc"].on("track") async def on_track(track): if track.kind == "audio": self.pcs[call_number]["audio_track"] = track # audio from client (server use) if call_number == 1: correct_queue = self.ip_call_1_packet_queue elif call_number == 2: correct_queue = self.ip_call_2_packet_queue else: correct_queue = self.ip_call_3_packet_queue self.put_to_q = True self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue) self.pcs[call_number]["audio_blackhole"] = MediaBlackhole() self.pcs[call_number]["audio_blackhole"].addTrack( self.pcs[call_number]["audio_track_for_local_use"]) await self.pcs[call_number]["audio_blackhole"].start() else: self.pcs[call_number]["video_track"] = track # video from client (server use) self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter, call_number, self) self.pcs[call_number]["video_blackhole"] = MediaBlackhole() self.pcs[call_number]["video_blackhole"].addTrack( self.pcs[call_number]["video_track_for_local_use"]) await self.pcs[call_number]["video_blackhole"].start() offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) # handle offer await self.pcs[call_number]["pc"].setRemoteDescription(offer) # send answer answer = await self.pcs[call_number]["pc"].createAnswer() await self.pcs[call_number]["pc"].setLocalDescription(answer) return web.Response(content_type="application/json", text=json.dumps( {"sdp": self.pcs[call_number]["pc"].localDescription.sdp, "type": self.pcs[call_number]["pc"].localDescription.type})) else: return self.reject_offer(call_number, peer_connection) else: return self.reject_offer(call_number, peer_connection) except: error = traceback.format_exc() self.to_emitter.send({"type": "error", "error_message": error}) </code>

    async def offer(self, request):
        try:
            params = await request.json()

            peer_connection = {
                "name": params["name"],
                "surname": params["surname"],
                "pc": None,
                "is_closed": False,
                "dc": None,
                "uid": uuid.uuid4(),
                "audio_track": None,
                "audio_track_for_local_use": None,
                "audio_blackhole": None,
                "video_track": None,
                "video_track_for_local_use": None,
                "video_blackhole": None,
                "offer_in_progress": True,
                "call_answered": False,
                "manage_call_end_thread": None,
                "stop_in_progress": False,
                "call_number": None
            }

            if len(list(self.pcs.keys())) == 3:
                return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))

            reserved_call_numbers = list(self.pcs.keys())

            if 1 not in reserved_call_numbers:
                call_number = 1
            elif 2 not in reserved_call_numbers:
                call_number = 2
            else:
                call_number = 3

            peer_connection["call_number"] = call_number
            self.pcs[call_number] = peer_connection

            self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})

            timer = 0
            while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
                if request.transport is None or request.transport.is_closing():
                    self.to_emitter.send({"type":"transport-error","call-number":call_number})
                    try:
                        request.transport.close()
                    except:
                        pass
                    del self.pcs[call_number]
                    return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
                timer += 0.1
                await asyncio.sleep(0.1)

            self.pcs[call_number]["call_answered"] = True
            if self.queue.qsize() == 0:
                return self.reject_offer(call_number,peer_connection)
            else:
                data = self.queue.get()
                correct_types = ["call-1","call-2","call-3"]
                if data["type"] in correct_types:
                    if (data["call"] == "reject"):
                        return self.reject_offer(call_number, peer_connection)
                    elif (data["call"] == "answer"):
                        while not self.queue.empty():
                            self.queue.get()

                        self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))

                        @self.pcs[call_number]["pc"].on("iceconnectionstatechange")
                        async def on_ice_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"ICE connection state: {pc.iceConnectionState}")

                            if pc.iceConnectionState == "failed":
                                print("ICE connection failed. Attempting to restart ICE.")
                                await pc.restartIce()
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])


                            if pc.iceConnectionState == "disconnected":
                                # Wait before handling disconnection
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("connectionstatechange")
                        async def on_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"Connection state: {pc.connectionState}")
                            if pc.connectionState in ["failed", "disconnected", "closed"]:
                                print("Connection failed, disconnected, or closed. Stopping connection.")
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("datachannel")
                        async def on_datachannel(channel):
                            self.pcs[call_number]["dc"] = channel

                            # Send UID to the connecting peer
                            try:
                                channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
                            except Exception as e:
                                print(f"Error sending UID: {e}")

                            # Inform about other peers
                            try:
                                for uid, pc in self.pcs.items():
                                    if pc['uid'] != peer_connection['uid']:
                                        try:
                                            pc['dc'].send(json.dumps({
                                                "type": "other-uid",
                                                "uid": str(peer_connection['uid']),
                                                "name": peer_connection["name"],
                                                "surname": peer_connection["surname"]
                                            }))
                                        except Exception as e:
                                            print(f"Error sending other-uid message: {e}")
                            except Exception as e:
                                print(f"Error informing about other peers: {e}")

                            @channel.on("message")
                            async def on_message(message):
                                try:
                                    message = json.loads(message)
                                    msg_type = message.get("type")
                                    if msg_type == "disconnected":
                                        print('disconnected message from client')
                                        await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
                                    elif msg_type in ["offer", "answer", "ice-candidate"]:
                                        target_uid = message["to_uid"]
                                        for uid, pc in self.pcs.items():
                                            if str(pc['uid']) == target_uid:
                                                try:
                                                    pc["dc"].send(json.dumps(message))
                                                except Exception as e:
                                                    print(f"Error relaying {msg_type}: {e}")
                                    else:
                                        print(f"Unhandled message type: {msg_type}")
                                except Exception as e:
                                    print(f"Error handling message: {e}")

                            @channel.on("close")
                            async def on_close():
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        # audio from server to client
                        if self.server_audio_stream_offer == None:
                            self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
                            #self.server_audio_stream_offer = AudioStreamTrack()

                        self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)

                        # video from server to client
                        if self.server_video_stream_offer is None:
                            self.server_video_stream_offer = self.create_local_tracks()
                        self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)

                        # Attach video from server to QLabel
                        if self.server_video_track is None:
                            self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
                        if self.server_video_blackholde is None:
                            self.server_video_blackholde = MediaBlackhole()
                            self.server_video_blackholde.addTrack(self.server_video_track)
                            await self.server_video_blackholde.start()

                        @self.pcs[call_number]["pc"].on("track")
                        async def on_track(track):
                            if track.kind == "audio":
                                self.pcs[call_number]["audio_track"] = track
                                # audio from client (server use)
                                if call_number == 1:
                                    correct_queue = self.ip_call_1_packet_queue
                                elif call_number == 2:
                                    correct_queue = self.ip_call_2_packet_queue
                                else:
                                    correct_queue = self.ip_call_3_packet_queue
                                self.put_to_q = True
                                self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
                                self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["audio_blackhole"].addTrack(
                                    self.pcs[call_number]["audio_track_for_local_use"])
                                await self.pcs[call_number]["audio_blackhole"].start()
                            else:
                                self.pcs[call_number]["video_track"] = track
                                # video from client (server use)
                                self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
                                                                                                     call_number, self)
                                self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["video_blackhole"].addTrack(
                                    self.pcs[call_number]["video_track_for_local_use"])
                                await self.pcs[call_number]["video_blackhole"].start()

                        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

                        # handle offer
                        await self.pcs[call_number]["pc"].setRemoteDescription(offer)

                        # send answer
                        answer = await self.pcs[call_number]["pc"].createAnswer()
                        await self.pcs[call_number]["pc"].setLocalDescription(answer)

                        return web.Response(content_type="application/json", text=json.dumps(
                            {"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
                             "type": self.pcs[call_number]["pc"].localDescription.type}))
                    else:
                        return self.reject_offer(call_number, peer_connection)
                else:
                    return self.reject_offer(call_number, peer_connection)
        except:
            error = traceback.format_exc()
            self.to_emitter.send({"type": "error", "error_message": error})

stop_peer_connection:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code> async def stop_peer_connection(self, call_number, uid):
try:
if call_number not in self.pcs:
# Peer connection does not exist
return None
if self.pcs[call_number]["stop_in_progress"]:
# Stop process is already in progress
return None
self.pcs[call_number]["stop_in_progress"] = True
# 1. Notify PyQt5 about the stop
self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
# 2. Empty the correct queue
queue = self.call_queues[call_number - 1]
while not queue.empty():
queue.get()
# 4. Close data channel
try:
self.pcs[call_number]["dc"].close()
except Exception:
pass
# 5. Stop client audio track
try:
await self.pcs[call_number]["audio_blackhole"].stop()
except Exception:
pass
# 6. Notify PyQt5 about call status
self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
# 7. Stop client video track
try:
self.pcs[call_number]["video_track_for_local_use"].stop()
await self.pcs[call_number]["video_blackhole"].stop()
except Exception:
pass
# 9. Release server resources if there are no remaining connections
if len(self.pcs.keys()) == 1: # No active peer connections
# Stop video relay and related blackholes
try:
if self.server_video_blackholde:
await self.server_video_blackholde.stop()
self.server_video_blackholde = None
if self.server_video_track:
await self.server_video_track.stop()
self.server_video_track = None
except Exception:
pass
# Stop server audio stream
try:
if self.server_audio_stream_offer:
self.server_audio_stream_offer.stop()
self.server_audio_stream_offer = None
except Exception:
pass
# Stop and release the webcam
try:
if self.webcam:
if self.webcam.video:
self.webcam.video.stop()
await asyncio.sleep(0.1) # Give time for cleanup
self.webcam = None
except Exception as e:
self.to_emitter.send({"type": "error", "error_message": str(e)})
# Attempt final resource cleanup
gc.collect() # Force garbage collection to release resources
self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})
# 3. Close peer connection
try:
await self.pcs[call_number]["pc"].close()
print('Peer connection closed correctly!!!')
except Exception:
print(traceback.format_exc())
# 8. Cleanup
try:
del self.pcs[call_number]
except Exception:
pass
except Exception:
error = traceback.format_exc()
print(error)
self.to_emitter.send({"type": "error", "error_message": error})
</code>
<code> async def stop_peer_connection(self, call_number, uid): try: if call_number not in self.pcs: # Peer connection does not exist return None if self.pcs[call_number]["stop_in_progress"]: # Stop process is already in progress return None self.pcs[call_number]["stop_in_progress"] = True # 1. Notify PyQt5 about the stop self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number}) # 2. Empty the correct queue queue = self.call_queues[call_number - 1] while not queue.empty(): queue.get() # 4. Close data channel try: self.pcs[call_number]["dc"].close() except Exception: pass # 5. Stop client audio track try: await self.pcs[call_number]["audio_blackhole"].stop() except Exception: pass # 6. Notify PyQt5 about call status self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"}) # 7. Stop client video track try: self.pcs[call_number]["video_track_for_local_use"].stop() await self.pcs[call_number]["video_blackhole"].stop() except Exception: pass # 9. Release server resources if there are no remaining connections if len(self.pcs.keys()) == 1: # No active peer connections # Stop video relay and related blackholes try: if self.server_video_blackholde: await self.server_video_blackholde.stop() self.server_video_blackholde = None if self.server_video_track: await self.server_video_track.stop() self.server_video_track = None except Exception: pass # Stop server audio stream try: if self.server_audio_stream_offer: self.server_audio_stream_offer.stop() self.server_audio_stream_offer = None except Exception: pass # Stop and release the webcam try: if self.webcam: if self.webcam.video: self.webcam.video.stop() await asyncio.sleep(0.1) # Give time for cleanup self.webcam = None except Exception as e: self.to_emitter.send({"type": "error", "error_message": str(e)}) # Attempt final resource cleanup gc.collect() # Force garbage collection to release resources self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."}) # 3. Close peer connection try: await self.pcs[call_number]["pc"].close() print('Peer connection closed correctly!!!') except Exception: print(traceback.format_exc()) # 8. Cleanup try: del self.pcs[call_number] except Exception: pass except Exception: error = traceback.format_exc() print(error) self.to_emitter.send({"type": "error", "error_message": error}) </code>
    async def stop_peer_connection(self, call_number, uid):
        try:
            if call_number not in self.pcs:
                # Peer connection does not exist
                return None
            if self.pcs[call_number]["stop_in_progress"]:
                # Stop process is already in progress
                return None
            self.pcs[call_number]["stop_in_progress"] = True
            # 1. Notify PyQt5 about the stop
            self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
            # 2. Empty the correct queue
            queue = self.call_queues[call_number - 1]
            while not queue.empty():
                queue.get()
            # 4. Close data channel
            try:
                self.pcs[call_number]["dc"].close()
            except Exception:
                pass
            # 5. Stop client audio track
            try:
                await self.pcs[call_number]["audio_blackhole"].stop()
            except Exception:
                pass
            # 6. Notify PyQt5 about call status
            self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
            # 7. Stop client video track
            try:
                self.pcs[call_number]["video_track_for_local_use"].stop()
                await self.pcs[call_number]["video_blackhole"].stop()
            except Exception:
                pass
            # 9. Release server resources if there are no remaining connections
            if len(self.pcs.keys()) == 1:  # No active peer connections
                # Stop video relay and related blackholes
                try:
                    if self.server_video_blackholde:
                        await self.server_video_blackholde.stop()
                        self.server_video_blackholde = None
                    if self.server_video_track:
                        await self.server_video_track.stop()
                        self.server_video_track = None
                except Exception:
                    pass

                # Stop server audio stream
                try:
                    if self.server_audio_stream_offer:
                        self.server_audio_stream_offer.stop()
                        self.server_audio_stream_offer = None
                except Exception:
                    pass

                # Stop and release the webcam
                try:
                    if self.webcam:
                        if self.webcam.video:
                            self.webcam.video.stop()
                        await asyncio.sleep(0.1)  # Give time for cleanup
                        self.webcam = None
                except Exception as e:
                    self.to_emitter.send({"type": "error", "error_message": str(e)})

                # Attempt final resource cleanup
                gc.collect()  # Force garbage collection to release resources
                self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})

            # 3. Close peer connection
            try:
                await self.pcs[call_number]["pc"].close()
                print('Peer connection closed correctly!!!')
            except Exception:
                print(traceback.format_exc())
            # 8. Cleanup
            try:
                del self.pcs[call_number]
            except Exception:
                pass
        except Exception:
            error = traceback.format_exc()
            print(error)
            self.to_emitter.send({"type": "error", "error_message": error})

The problem with this code is that when client is disconnected, so it’s send disconnect message from dc, then stop_peer_connection async method is runned, then this message:

print('Peer connection closed correctly!!!')

never printed because await self.pcs[call_number]["pc"].close() never returns

1

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa

Cannot close correctly aiortc RTCPeerConnection after client disconnect

offer:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code>
async def offer(self, request):
try:
params = await request.json()
peer_connection = {
"name": params["name"],
"surname": params["surname"],
"pc": None,
"is_closed": False,
"dc": None,
"uid": uuid.uuid4(),
"audio_track": None,
"audio_track_for_local_use": None,
"audio_blackhole": None,
"video_track": None,
"video_track_for_local_use": None,
"video_blackhole": None,
"offer_in_progress": True,
"call_answered": False,
"manage_call_end_thread": None,
"stop_in_progress": False,
"call_number": None
}
if len(list(self.pcs.keys())) == 3:
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
reserved_call_numbers = list(self.pcs.keys())
if 1 not in reserved_call_numbers:
call_number = 1
elif 2 not in reserved_call_numbers:
call_number = 2
else:
call_number = 3
peer_connection["call_number"] = call_number
self.pcs[call_number] = peer_connection
self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})
timer = 0
while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
if request.transport is None or request.transport.is_closing():
self.to_emitter.send({"type":"transport-error","call-number":call_number})
try:
request.transport.close()
except:
pass
del self.pcs[call_number]
return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
timer += 0.1
await asyncio.sleep(0.1)
self.pcs[call_number]["call_answered"] = True
if self.queue.qsize() == 0:
return self.reject_offer(call_number,peer_connection)
else:
data = self.queue.get()
correct_types = ["call-1","call-2","call-3"]
if data["type"] in correct_types:
if (data["call"] == "reject"):
return self.reject_offer(call_number, peer_connection)
elif (data["call"] == "answer"):
while not self.queue.empty():
self.queue.get()
self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))
@self.pcs[call_number]["pc"].on("iceconnectionstatechange")
async def on_ice_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"ICE connection state: {pc.iceConnectionState}")
if pc.iceConnectionState == "failed":
print("ICE connection failed. Attempting to restart ICE.")
await pc.restartIce()
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
if pc.iceConnectionState == "disconnected":
# Wait before handling disconnection
await asyncio.sleep(5) # Adjust as needed
if pc.iceConnectionState == "disconnected":
print("Peer connection lost. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("connectionstatechange")
async def on_connection_state_change():
pc = self.pcs[call_number]["pc"]
print(f"Connection state: {pc.connectionState}")
if pc.connectionState in ["failed", "disconnected", "closed"]:
print("Connection failed, disconnected, or closed. Stopping connection.")
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
@self.pcs[call_number]["pc"].on("datachannel")
async def on_datachannel(channel):
self.pcs[call_number]["dc"] = channel
# Send UID to the connecting peer
try:
channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
except Exception as e:
print(f"Error sending UID: {e}")
# Inform about other peers
try:
for uid, pc in self.pcs.items():
if pc['uid'] != peer_connection['uid']:
try:
pc['dc'].send(json.dumps({
"type": "other-uid",
"uid": str(peer_connection['uid']),
"name": peer_connection["name"],
"surname": peer_connection["surname"]
}))
except Exception as e:
print(f"Error sending other-uid message: {e}")
except Exception as e:
print(f"Error informing about other peers: {e}")
@channel.on("message")
async def on_message(message):
try:
message = json.loads(message)
msg_type = message.get("type")
if msg_type == "disconnected":
print('disconnected message from client')
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
elif msg_type in ["offer", "answer", "ice-candidate"]:
target_uid = message["to_uid"]
for uid, pc in self.pcs.items():
if str(pc['uid']) == target_uid:
try:
pc["dc"].send(json.dumps(message))
except Exception as e:
print(f"Error relaying {msg_type}: {e}")
else:
print(f"Unhandled message type: {msg_type}")
except Exception as e:
print(f"Error handling message: {e}")
@channel.on("close")
async def on_close():
await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
# audio from server to client
if self.server_audio_stream_offer == None:
self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
#self.server_audio_stream_offer = AudioStreamTrack()
self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)
# video from server to client
if self.server_video_stream_offer is None:
self.server_video_stream_offer = self.create_local_tracks()
self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)
# Attach video from server to QLabel
if self.server_video_track is None:
self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
if self.server_video_blackholde is None:
self.server_video_blackholde = MediaBlackhole()
self.server_video_blackholde.addTrack(self.server_video_track)
await self.server_video_blackholde.start()
@self.pcs[call_number]["pc"].on("track")
async def on_track(track):
if track.kind == "audio":
self.pcs[call_number]["audio_track"] = track
# audio from client (server use)
if call_number == 1:
correct_queue = self.ip_call_1_packet_queue
elif call_number == 2:
correct_queue = self.ip_call_2_packet_queue
else:
correct_queue = self.ip_call_3_packet_queue
self.put_to_q = True
self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
self.pcs[call_number]["audio_blackhole"].addTrack(
self.pcs[call_number]["audio_track_for_local_use"])
await self.pcs[call_number]["audio_blackhole"].start()
else:
self.pcs[call_number]["video_track"] = track
# video from client (server use)
self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
call_number, self)
self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
self.pcs[call_number]["video_blackhole"].addTrack(
self.pcs[call_number]["video_track_for_local_use"])
await self.pcs[call_number]["video_blackhole"].start()
offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])
# handle offer
await self.pcs[call_number]["pc"].setRemoteDescription(offer)
# send answer
answer = await self.pcs[call_number]["pc"].createAnswer()
await self.pcs[call_number]["pc"].setLocalDescription(answer)
return web.Response(content_type="application/json", text=json.dumps(
{"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
"type": self.pcs[call_number]["pc"].localDescription.type}))
else:
return self.reject_offer(call_number, peer_connection)
else:
return self.reject_offer(call_number, peer_connection)
except:
error = traceback.format_exc()
self.to_emitter.send({"type": "error", "error_message": error})
</code>
<code> async def offer(self, request): try: params = await request.json() peer_connection = { "name": params["name"], "surname": params["surname"], "pc": None, "is_closed": False, "dc": None, "uid": uuid.uuid4(), "audio_track": None, "audio_track_for_local_use": None, "audio_blackhole": None, "video_track": None, "video_track_for_local_use": None, "video_blackhole": None, "offer_in_progress": True, "call_answered": False, "manage_call_end_thread": None, "stop_in_progress": False, "call_number": None } if len(list(self.pcs.keys())) == 3: return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""})) reserved_call_numbers = list(self.pcs.keys()) if 1 not in reserved_call_numbers: call_number = 1 elif 2 not in reserved_call_numbers: call_number = 2 else: call_number = 3 peer_connection["call_number"] = call_number self.pcs[call_number] = peer_connection self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']}) timer = 0 while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False): if request.transport is None or request.transport.is_closing(): self.to_emitter.send({"type":"transport-error","call-number":call_number}) try: request.transport.close() except: pass del self.pcs[call_number] return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""})) timer += 0.1 await asyncio.sleep(0.1) self.pcs[call_number]["call_answered"] = True if self.queue.qsize() == 0: return self.reject_offer(call_number,peer_connection) else: data = self.queue.get() correct_types = ["call-1","call-2","call-3"] if data["type"] in correct_types: if (data["call"] == "reject"): return self.reject_offer(call_number, peer_connection) elif (data["call"] == "answer"): while not self.queue.empty(): self.queue.get() self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),])) @self.pcs[call_number]["pc"].on("iceconnectionstatechange") async def on_ice_connection_state_change(): pc = self.pcs[call_number]["pc"] print(f"ICE connection state: {pc.iceConnectionState}") if pc.iceConnectionState == "failed": print("ICE connection failed. Attempting to restart ICE.") await pc.restartIce() await asyncio.sleep(5) # Adjust as needed if pc.iceConnectionState == "disconnected": print("Peer connection lost. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) if pc.iceConnectionState == "disconnected": # Wait before handling disconnection await asyncio.sleep(5) # Adjust as needed if pc.iceConnectionState == "disconnected": print("Peer connection lost. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) @self.pcs[call_number]["pc"].on("connectionstatechange") async def on_connection_state_change(): pc = self.pcs[call_number]["pc"] print(f"Connection state: {pc.connectionState}") if pc.connectionState in ["failed", "disconnected", "closed"]: print("Connection failed, disconnected, or closed. Stopping connection.") await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) @self.pcs[call_number]["pc"].on("datachannel") async def on_datachannel(channel): self.pcs[call_number]["dc"] = channel # Send UID to the connecting peer try: channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])})) except Exception as e: print(f"Error sending UID: {e}") # Inform about other peers try: for uid, pc in self.pcs.items(): if pc['uid'] != peer_connection['uid']: try: pc['dc'].send(json.dumps({ "type": "other-uid", "uid": str(peer_connection['uid']), "name": peer_connection["name"], "surname": peer_connection["surname"] })) except Exception as e: print(f"Error sending other-uid message: {e}") except Exception as e: print(f"Error informing about other peers: {e}") @channel.on("message") async def on_message(message): try: message = json.loads(message) msg_type = message.get("type") if msg_type == "disconnected": print('disconnected message from client') await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) elif msg_type in ["offer", "answer", "ice-candidate"]: target_uid = message["to_uid"] for uid, pc in self.pcs.items(): if str(pc['uid']) == target_uid: try: pc["dc"].send(json.dumps(message)) except Exception as e: print(f"Error relaying {msg_type}: {e}") else: print(f"Unhandled message type: {msg_type}") except Exception as e: print(f"Error handling message: {e}") @channel.on("close") async def on_close(): await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"]) # audio from server to client if self.server_audio_stream_offer == None: self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter) #self.server_audio_stream_offer = AudioStreamTrack() self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer) # video from server to client if self.server_video_stream_offer is None: self.server_video_stream_offer = self.create_local_tracks() self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer) # Attach video from server to QLabel if self.server_video_track is None: self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter) if self.server_video_blackholde is None: self.server_video_blackholde = MediaBlackhole() self.server_video_blackholde.addTrack(self.server_video_track) await self.server_video_blackholde.start() @self.pcs[call_number]["pc"].on("track") async def on_track(track): if track.kind == "audio": self.pcs[call_number]["audio_track"] = track # audio from client (server use) if call_number == 1: correct_queue = self.ip_call_1_packet_queue elif call_number == 2: correct_queue = self.ip_call_2_packet_queue else: correct_queue = self.ip_call_3_packet_queue self.put_to_q = True self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue) self.pcs[call_number]["audio_blackhole"] = MediaBlackhole() self.pcs[call_number]["audio_blackhole"].addTrack( self.pcs[call_number]["audio_track_for_local_use"]) await self.pcs[call_number]["audio_blackhole"].start() else: self.pcs[call_number]["video_track"] = track # video from client (server use) self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter, call_number, self) self.pcs[call_number]["video_blackhole"] = MediaBlackhole() self.pcs[call_number]["video_blackhole"].addTrack( self.pcs[call_number]["video_track_for_local_use"]) await self.pcs[call_number]["video_blackhole"].start() offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"]) # handle offer await self.pcs[call_number]["pc"].setRemoteDescription(offer) # send answer answer = await self.pcs[call_number]["pc"].createAnswer() await self.pcs[call_number]["pc"].setLocalDescription(answer) return web.Response(content_type="application/json", text=json.dumps( {"sdp": self.pcs[call_number]["pc"].localDescription.sdp, "type": self.pcs[call_number]["pc"].localDescription.type})) else: return self.reject_offer(call_number, peer_connection) else: return self.reject_offer(call_number, peer_connection) except: error = traceback.format_exc() self.to_emitter.send({"type": "error", "error_message": error}) </code>

    async def offer(self, request):
        try:
            params = await request.json()

            peer_connection = {
                "name": params["name"],
                "surname": params["surname"],
                "pc": None,
                "is_closed": False,
                "dc": None,
                "uid": uuid.uuid4(),
                "audio_track": None,
                "audio_track_for_local_use": None,
                "audio_blackhole": None,
                "video_track": None,
                "video_track_for_local_use": None,
                "video_blackhole": None,
                "offer_in_progress": True,
                "call_answered": False,
                "manage_call_end_thread": None,
                "stop_in_progress": False,
                "call_number": None
            }

            if len(list(self.pcs.keys())) == 3:
                return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))

            reserved_call_numbers = list(self.pcs.keys())

            if 1 not in reserved_call_numbers:
                call_number = 1
            elif 2 not in reserved_call_numbers:
                call_number = 2
            else:
                call_number = 3

            peer_connection["call_number"] = call_number
            self.pcs[call_number] = peer_connection

            self.to_emitter.send({"type": "call_"+str(call_number)+"_offering", "name": peer_connection["name"], "surname": peer_connection["surname"],'uid':peer_connection['uid']})

            timer = 0
            while (timer < self.configuration["aiortc_time_window_for_answer_ms"]/1000 and self.queue.qsize() == 0 and self.pcs[call_number]["call_answered"] == False):
                if request.transport is None or request.transport.is_closing():
                    self.to_emitter.send({"type":"transport-error","call-number":call_number})
                    try:
                        request.transport.close()
                    except:
                        pass
                    del self.pcs[call_number]
                    return web.Response(content_type="application/json", text=json.dumps({"sdp": "", "type": ""}))
                timer += 0.1
                await asyncio.sleep(0.1)

            self.pcs[call_number]["call_answered"] = True
            if self.queue.qsize() == 0:
                return self.reject_offer(call_number,peer_connection)
            else:
                data = self.queue.get()
                correct_types = ["call-1","call-2","call-3"]
                if data["type"] in correct_types:
                    if (data["call"] == "reject"):
                        return self.reject_offer(call_number, peer_connection)
                    elif (data["call"] == "answer"):
                        while not self.queue.empty():
                            self.queue.get()

                        self.pcs[call_number]["pc"] = RTCPeerConnection(configuration=RTCConfiguration([RTCIceServer("stun:stun.l.google:19302"),]))

                        @self.pcs[call_number]["pc"].on("iceconnectionstatechange")
                        async def on_ice_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"ICE connection state: {pc.iceConnectionState}")

                            if pc.iceConnectionState == "failed":
                                print("ICE connection failed. Attempting to restart ICE.")
                                await pc.restartIce()
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])


                            if pc.iceConnectionState == "disconnected":
                                # Wait before handling disconnection
                                await asyncio.sleep(5)  # Adjust as needed
                                if pc.iceConnectionState == "disconnected":
                                    print("Peer connection lost. Stopping connection.")
                                    await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("connectionstatechange")
                        async def on_connection_state_change():
                            pc = self.pcs[call_number]["pc"]
                            print(f"Connection state: {pc.connectionState}")
                            if pc.connectionState in ["failed", "disconnected", "closed"]:
                                print("Connection failed, disconnected, or closed. Stopping connection.")
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        @self.pcs[call_number]["pc"].on("datachannel")
                        async def on_datachannel(channel):
                            self.pcs[call_number]["dc"] = channel

                            # Send UID to the connecting peer
                            try:
                                channel.send(json.dumps({"type": "uid", "uid": str(peer_connection["uid"])}))
                            except Exception as e:
                                print(f"Error sending UID: {e}")

                            # Inform about other peers
                            try:
                                for uid, pc in self.pcs.items():
                                    if pc['uid'] != peer_connection['uid']:
                                        try:
                                            pc['dc'].send(json.dumps({
                                                "type": "other-uid",
                                                "uid": str(peer_connection['uid']),
                                                "name": peer_connection["name"],
                                                "surname": peer_connection["surname"]
                                            }))
                                        except Exception as e:
                                            print(f"Error sending other-uid message: {e}")
                            except Exception as e:
                                print(f"Error informing about other peers: {e}")

                            @channel.on("message")
                            async def on_message(message):
                                try:
                                    message = json.loads(message)
                                    msg_type = message.get("type")
                                    if msg_type == "disconnected":
                                        print('disconnected message from client')
                                        await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])
                                    elif msg_type in ["offer", "answer", "ice-candidate"]:
                                        target_uid = message["to_uid"]
                                        for uid, pc in self.pcs.items():
                                            if str(pc['uid']) == target_uid:
                                                try:
                                                    pc["dc"].send(json.dumps(message))
                                                except Exception as e:
                                                    print(f"Error relaying {msg_type}: {e}")
                                    else:
                                        print(f"Unhandled message type: {msg_type}")
                                except Exception as e:
                                    print(f"Error handling message: {e}")

                            @channel.on("close")
                            async def on_close():
                                await self.stop_peer_connection(call_number, self.pcs[call_number]["uid"])

                        # audio from server to client
                        if self.server_audio_stream_offer == None:
                            self.server_audio_stream_offer = Server_Audio_Stream_Offer(self.speackers_deck_queue,self.to_emitter)
                            #self.server_audio_stream_offer = AudioStreamTrack()

                        self.pcs[call_number]["pc"].addTrack(self.server_audio_stream_offer)

                        # video from server to client
                        if self.server_video_stream_offer is None:
                            self.server_video_stream_offer = self.create_local_tracks()
                        self.pcs[call_number]["pc"].addTrack(self.server_video_stream_offer)

                        # Attach video from server to QLabel
                        if self.server_video_track is None:
                            self.server_video_track = WebCamera(self.server_video_stream_offer,self.to_emitter)
                        if self.server_video_blackholde is None:
                            self.server_video_blackholde = MediaBlackhole()
                            self.server_video_blackholde.addTrack(self.server_video_track)
                            await self.server_video_blackholde.start()

                        @self.pcs[call_number]["pc"].on("track")
                        async def on_track(track):
                            if track.kind == "audio":
                                self.pcs[call_number]["audio_track"] = track
                                # audio from client (server use)
                                if call_number == 1:
                                    correct_queue = self.ip_call_1_packet_queue
                                elif call_number == 2:
                                    correct_queue = self.ip_call_2_packet_queue
                                else:
                                    correct_queue = self.ip_call_3_packet_queue
                                self.put_to_q = True
                                self.pcs[call_number]["audio_track_for_local_use"] = ClientTrack(track, self, self.to_emitter,call_number,self.put_to_q,correct_queue)
                                self.pcs[call_number]["audio_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["audio_blackhole"].addTrack(
                                    self.pcs[call_number]["audio_track_for_local_use"])
                                await self.pcs[call_number]["audio_blackhole"].start()
                            else:
                                self.pcs[call_number]["video_track"] = track
                                # video from client (server use)
                                self.pcs[call_number]["video_track_for_local_use"] = ClientWebCamera(track, self.to_emitter,
                                                                                                     call_number, self)
                                self.pcs[call_number]["video_blackhole"] = MediaBlackhole()
                                self.pcs[call_number]["video_blackhole"].addTrack(
                                    self.pcs[call_number]["video_track_for_local_use"])
                                await self.pcs[call_number]["video_blackhole"].start()

                        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

                        # handle offer
                        await self.pcs[call_number]["pc"].setRemoteDescription(offer)

                        # send answer
                        answer = await self.pcs[call_number]["pc"].createAnswer()
                        await self.pcs[call_number]["pc"].setLocalDescription(answer)

                        return web.Response(content_type="application/json", text=json.dumps(
                            {"sdp": self.pcs[call_number]["pc"].localDescription.sdp,
                             "type": self.pcs[call_number]["pc"].localDescription.type}))
                    else:
                        return self.reject_offer(call_number, peer_connection)
                else:
                    return self.reject_offer(call_number, peer_connection)
        except:
            error = traceback.format_exc()
            self.to_emitter.send({"type": "error", "error_message": error})

stop_peer_connection:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
<code> async def stop_peer_connection(self, call_number, uid):
try:
if call_number not in self.pcs:
# Peer connection does not exist
return None
if self.pcs[call_number]["stop_in_progress"]:
# Stop process is already in progress
return None
self.pcs[call_number]["stop_in_progress"] = True
# 1. Notify PyQt5 about the stop
self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
# 2. Empty the correct queue
queue = self.call_queues[call_number - 1]
while not queue.empty():
queue.get()
# 4. Close data channel
try:
self.pcs[call_number]["dc"].close()
except Exception:
pass
# 5. Stop client audio track
try:
await self.pcs[call_number]["audio_blackhole"].stop()
except Exception:
pass
# 6. Notify PyQt5 about call status
self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
# 7. Stop client video track
try:
self.pcs[call_number]["video_track_for_local_use"].stop()
await self.pcs[call_number]["video_blackhole"].stop()
except Exception:
pass
# 9. Release server resources if there are no remaining connections
if len(self.pcs.keys()) == 1: # No active peer connections
# Stop video relay and related blackholes
try:
if self.server_video_blackholde:
await self.server_video_blackholde.stop()
self.server_video_blackholde = None
if self.server_video_track:
await self.server_video_track.stop()
self.server_video_track = None
except Exception:
pass
# Stop server audio stream
try:
if self.server_audio_stream_offer:
self.server_audio_stream_offer.stop()
self.server_audio_stream_offer = None
except Exception:
pass
# Stop and release the webcam
try:
if self.webcam:
if self.webcam.video:
self.webcam.video.stop()
await asyncio.sleep(0.1) # Give time for cleanup
self.webcam = None
except Exception as e:
self.to_emitter.send({"type": "error", "error_message": str(e)})
# Attempt final resource cleanup
gc.collect() # Force garbage collection to release resources
self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})
# 3. Close peer connection
try:
await self.pcs[call_number]["pc"].close()
print('Peer connection closed correctly!!!')
except Exception:
print(traceback.format_exc())
# 8. Cleanup
try:
del self.pcs[call_number]
except Exception:
pass
except Exception:
error = traceback.format_exc()
print(error)
self.to_emitter.send({"type": "error", "error_message": error})
</code>
<code> async def stop_peer_connection(self, call_number, uid): try: if call_number not in self.pcs: # Peer connection does not exist return None if self.pcs[call_number]["stop_in_progress"]: # Stop process is already in progress return None self.pcs[call_number]["stop_in_progress"] = True # 1. Notify PyQt5 about the stop self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number}) # 2. Empty the correct queue queue = self.call_queues[call_number - 1] while not queue.empty(): queue.get() # 4. Close data channel try: self.pcs[call_number]["dc"].close() except Exception: pass # 5. Stop client audio track try: await self.pcs[call_number]["audio_blackhole"].stop() except Exception: pass # 6. Notify PyQt5 about call status self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"}) # 7. Stop client video track try: self.pcs[call_number]["video_track_for_local_use"].stop() await self.pcs[call_number]["video_blackhole"].stop() except Exception: pass # 9. Release server resources if there are no remaining connections if len(self.pcs.keys()) == 1: # No active peer connections # Stop video relay and related blackholes try: if self.server_video_blackholde: await self.server_video_blackholde.stop() self.server_video_blackholde = None if self.server_video_track: await self.server_video_track.stop() self.server_video_track = None except Exception: pass # Stop server audio stream try: if self.server_audio_stream_offer: self.server_audio_stream_offer.stop() self.server_audio_stream_offer = None except Exception: pass # Stop and release the webcam try: if self.webcam: if self.webcam.video: self.webcam.video.stop() await asyncio.sleep(0.1) # Give time for cleanup self.webcam = None except Exception as e: self.to_emitter.send({"type": "error", "error_message": str(e)}) # Attempt final resource cleanup gc.collect() # Force garbage collection to release resources self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."}) # 3. Close peer connection try: await self.pcs[call_number]["pc"].close() print('Peer connection closed correctly!!!') except Exception: print(traceback.format_exc()) # 8. Cleanup try: del self.pcs[call_number] except Exception: pass except Exception: error = traceback.format_exc() print(error) self.to_emitter.send({"type": "error", "error_message": error}) </code>
    async def stop_peer_connection(self, call_number, uid):
        try:
            if call_number not in self.pcs:
                # Peer connection does not exist
                return None
            if self.pcs[call_number]["stop_in_progress"]:
                # Stop process is already in progress
                return None
            self.pcs[call_number]["stop_in_progress"] = True
            # 1. Notify PyQt5 about the stop
            self.to_emitter.send({"type": "stop-peer-connection", "call_number": call_number})
            # 2. Empty the correct queue
            queue = self.call_queues[call_number - 1]
            while not queue.empty():
                queue.get()
            # 4. Close data channel
            try:
                self.pcs[call_number]["dc"].close()
            except Exception:
                pass
            # 5. Stop client audio track
            try:
                await self.pcs[call_number]["audio_blackhole"].stop()
            except Exception:
                pass
            # 6. Notify PyQt5 about call status
            self.to_emitter.send({"type": f"call-{call_number}-status", "status": "closed-by-client"})
            # 7. Stop client video track
            try:
                self.pcs[call_number]["video_track_for_local_use"].stop()
                await self.pcs[call_number]["video_blackhole"].stop()
            except Exception:
                pass
            # 9. Release server resources if there are no remaining connections
            if len(self.pcs.keys()) == 1:  # No active peer connections
                # Stop video relay and related blackholes
                try:
                    if self.server_video_blackholde:
                        await self.server_video_blackholde.stop()
                        self.server_video_blackholde = None
                    if self.server_video_track:
                        await self.server_video_track.stop()
                        self.server_video_track = None
                except Exception:
                    pass

                # Stop server audio stream
                try:
                    if self.server_audio_stream_offer:
                        self.server_audio_stream_offer.stop()
                        self.server_audio_stream_offer = None
                except Exception:
                    pass

                # Stop and release the webcam
                try:
                    if self.webcam:
                        if self.webcam.video:
                            self.webcam.video.stop()
                        await asyncio.sleep(0.1)  # Give time for cleanup
                        self.webcam = None
                except Exception as e:
                    self.to_emitter.send({"type": "error", "error_message": str(e)})

                # Attempt final resource cleanup
                gc.collect()  # Force garbage collection to release resources
                self.to_emitter.send({"type": "log", "message": "Garbage collection triggered for cleanup."})

            # 3. Close peer connection
            try:
                await self.pcs[call_number]["pc"].close()
                print('Peer connection closed correctly!!!')
            except Exception:
                print(traceback.format_exc())
            # 8. Cleanup
            try:
                del self.pcs[call_number]
            except Exception:
                pass
        except Exception:
            error = traceback.format_exc()
            print(error)
            self.to_emitter.send({"type": "error", "error_message": error})

The problem with this code is that when client is disconnected, so it’s send disconnect message from dc, then stop_peer_connection async method is runned, then this message:

print('Peer connection closed correctly!!!')

never printed because await self.pcs[call_number]["pc"].close() never returns

1

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật