I’ve tried to rewrit this early draft of hte scirpt multiple times and end up with the scirpt either freezing or crashing due to a broken pipe.
currently with this implementation it is freezing at this point in the log statements before the scirpt no longer runs.
full script and log statements attatched below.
Tue Aug 6 20:53:07 2024 – Setting up CUDA environment variable for GPU processing
Tue Aug 6 20:53:07 2024 – Setting GST_PLUGIN_PATH to /usr/lib/x86_64-linux-gnu/gstreamer-1.0
Tue Aug 6 20:53:07 2024 – Starting script
Tue Aug 6 20:53:07 2024 – Initializing GStreamer
Tue Aug 6 20:53:10 2024 – Enabled detailed GStreamer debug output
Tue Aug 6 20:53:10 2024 – Set GST_DEBUG_DUMP_DOT_DIR to /home/ubuntu/dot
Tue Aug 6 20:53:10 2024 – GStreamer pipeline element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer source element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer audiorate element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer convert element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer resample element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer sink element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer queue element created successfully
Tue Aug 6 20:53:10 2024 – Adding elements to GStreamer pipeline
Tue Aug 6 20:53:10 2024 – Successfully added source to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added queue to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added audiorate to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added convert to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added resample to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added sink to the pipeline
Tue Aug 6 20:53:10 2024 – Setting Twitch stream URL
Tue Aug 6 20:53:10 2024 – Adding signal watch to GStreamer bus
Tue Aug 6 20:53:10 2024 – Setting GStreamer pipeline to PLAYING state
Tue Aug 6 20:53:10 2024 – Pipeline state change is asynchronous
Tue Aug 6 20:53:12 2024 – New pad ‘src_0’ added to ‘source’
Tue Aug 6 20:53:12 2024 – Pad ‘src_0’ is not an audio pad, media type: ‘video/x-raw, width=(int)[ 48, 4096 ], height=(int)[ 16, 4096 ], framerate=(fraction)[ 0/1, 2147483647/1 ], format=(string){ NV12 }; video/x-raw(memory:GLMemory), width=(int)[ 48, 4096 ], height=(int)[ 16, 4096 ], framerate=(fraction)[ 0/1, 2147483647/1 ], format=(string){ NV12 }; video/x-raw(memory:CUDAMemory), width=(int)[ 48, 4096 ], height=(int)[ 16, 4096 ], framerate=(fraction)[ 0/1, 2147483647/1 ], format=(string){ NV12 }’
Tue Aug 6 20:53:12 2024 – New pad ‘src_1’ added to ‘source’
Tue Aug 6 20:53:12 2024 – Pad ‘src_1’ is an audio pad with media type ‘audio/x-raw, format=(string)S16LE, layout=(string)interleaved, rate=(int)[ 8000, 96000 ], channels=(int)[ 1, 8 ]’
Tue Aug 6 20:53:12 2024 – Successfully linked pad ‘src_1’ to audiorate sink pad
script freezes here
import subprocess
import numpy as np
import librosa
import logging
import os
import torch
from funasr import AutoModel
import signal
import time
import streamlink
import gi
# GStreamer setup
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
# Constants
RATE = 16000
CHUNK = 1024
USERNAME = 'twitch_streamer_username'
MAX_RETRIES = 5
CHECKPOINT_DIR = './checkpoints'
# Twitch API credentials
CLIENT_ID = 'xxxxxxxxxxxxx'
CLIENT_SECRET = 'xxxxxxxxxxxxxx'
# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def custom_sigpipe_handler(signum, frame):
print(f"{time.ctime()} - SIGPIPE received: Broken pipe error", flush=True)
logging.error("SIGPIPE received: Broken pipe error")
# Set up signal handling for SIGPIPE
signal.signal(signal.SIGPIPE, custom_sigpipe_handler)
# Ensure proper CUDA environment variable is set for GPU processing
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
print(f"{time.ctime()} - Setting up CUDA environment variable for GPU processing", flush=True)
# GStreamer Plugin Pathway
plugin_path = "/usr/lib/x86_64-linux-gnu/gstreamer-1.0"
os.environ["GST_PLUGIN_PATH"] = plugin_path
print(f"{time.ctime()} - Setting GST_PLUGIN_PATH to {plugin_path}", flush=True)
# Custom SIGINT handler
def custom_sigint_handler(signum, frame):
raise KeyboardInterrupt
signal.signal(signal.SIGINT, custom_sigint_handler)
# Function to get Twitch stream URL using Streamlink
def get_twitch_stream_url(username):
try:
session = streamlink.Streamlink()
streams = session.streams(f"https://www.twitch.tv/{username}")
if 'best' in streams:
stream_url = streams['best'].url
logging.info(f"Stream URL: {stream_url}")
return stream_url
else:
logging.info("No playable streams found.")
return None
except Exception as e:
logging.error(f"Error fetching Twitch stream URL: {e}")
return None
# Error handling function for GStreamer
def on_error(bus, msg, pipeline):
err, debug = msg.parse_error()
print(f"{time.ctime()} - GStreamer error: {err}, {debug}", flush=True)
logging.error(f"GStreamer error: {err}, {debug}")
if "not-linked" in str(err):
print(f"{time.ctime()} - Error: Element not linked properly", flush=True)
logging.error("Element not linked properly")
elif "capabilities" in str(err):
print(f"{time.ctime()} - Error: Element capabilities issue", flush=True)
logging.error("Element capabilities issue")
pipeline.set_state(Gst.State.NULL)
return
# Function to normalize audio
def normalize_audio(audio):
print(f"{time.ctime()} - Normalizing audio", flush=True)
try:
return librosa.util.normalize(audio)
except Exception as e:
print(f"{time.ctime()} - Error normalizing audio: {e}", flush=True)
return audio # Return unnormalized audio in case of error
# Function to ensure finite values in audio buffer
def ensure_finite(audio):
print(f"{time.ctime()} - Ensuring finite values in audio buffer", flush=True)
try:
return np.nan_to_num(audio)
except Exception as e:
print(f"{time.ctime()} - Error ensuring finite values: {e}", flush=True)
return audio # Return original audio in case of error
# Function to perform emotion recognition
def perform_emotion_recognition(audio_data, model):
print(f"{time.ctime()} - Performing emotion recognition", flush=True)
try:
# Save the audio data to a temporary file
temp_wav_file = "temp_audio.wav"
librosa.output.write_wav(temp_wav_file, audio_data, RATE)
# Perform emotion recognition using the funasr model
res = model.generate(temp_wav_file, output_dir="./outputs", granularity="utterance", extract_embedding=False)
return res
except FileNotFoundError as e:
print(f"{time.ctime()} - FileNotFoundError: {e}", flush=True)
except librosa.util.exceptions.ParameterError as e:
print(f"{time.ctime()} - librosa ParameterError: {e}", flush=True)
except Exception as e:
print(f"{time.ctime()} - Failed to perform emotion recognition: {e}", flush=True)
return None
# Main function to stream and process audio
def stream_audio_from_twitch():
print(f"{time.ctime()} - Initializing GStreamer", flush=True)
Gst.init(None)
# Enable detailed debug output
os.environ["GST_DEBUG"] = "*:5"
print(f"{time.ctime()} - Enabled detailed GStreamer debug output", flush=True)
# Generate pipeline graphs
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/home/ubuntu/dot"
print(f"{time.ctime()} - Set GST_DEBUG_DUMP_DOT_DIR to /home/ubuntu/dot", flush=True)
# Create GStreamer elements
elements = {
"pipeline": Gst.Pipeline(),
"source": Gst.ElementFactory.make("uridecodebin", "source"),
"audiorate": Gst.ElementFactory.make("audiorate", "audiorate"),
"convert": Gst.ElementFactory.make("audioconvert", "convert"),
"resample": Gst.ElementFactory.make("audioresample", "resample"),
"sink": Gst.ElementFactory.make("appsink", "sink"),
"queue": Gst.ElementFactory.make("queue", "queue")
}
# Check each element and print specific error messages
for name, element in elements.items():
if not element:
print(f"{time.ctime()} - Error: GStreamer {name} element could not be created", flush=True)
logging.error(f"GStreamer {name} element could not be created.")
else:
print(f"{time.ctime()} - GStreamer {name} element created successfully", flush=True)
logging.info(f"GStreamer {name} element created successfully")
# If any element is missing, return early
if not all(elements.values()):
print(f"{time.ctime()} - Error: GStreamer elements could not be created", flush=True)
logging.error("GStreamer elements could not be created.")
return
print(f"{time.ctime()} - Adding elements to GStreamer pipeline", flush=True)
pipeline = elements["pipeline"]
pipeline.set_state(Gst.State.NULL) # Ensure pipeline is in NULL state
for name in ["source", "queue", "audiorate", "convert", "resample", "sink"]:
if pipeline.add(elements[name]):
print(f"{time.ctime()} - Successfully added {name} to the pipeline", flush=True)
logging.info(f"Successfully added {name} to the pipeline")
else:
print(f"{time.ctime()} - Failed to add {name} to the pipeline", flush=True)
logging.error(f"Failed to add {name} to the pipeline")
return # Exit if any element fails to add
# Correct handling of dynamic pads
def on_pad_added(src, pad):
print(f"{time.ctime()} - New pad '{pad.get_name()}' added to '{src.get_name()}'", flush=True)
caps = pad.query_caps(None)
media_type = caps.to_string()
if "audio" in media_type:
print(f"{time.ctime()} - Pad '{pad.get_name()}' is an audio pad with media type '{media_type}'", flush=True)
sink_pad = elements["audiorate"].get_static_pad("sink")
if pad.link(sink_pad) != Gst.PadLinkReturn.OK:
print(f"{time.ctime()} - Failed to link pad '{pad.get_name()}' to audiorate sink pad", flush=True)
logging.error(f"Failed to link pad '{pad.get_name()}' to audiorate sink pad")
else:
print(f"{time.ctime()} - Successfully linked pad '{pad.get_name()}' to audiorate sink pad", flush=True)
else:
print(f"{time.ctime()} - Pad '{pad.get_name()}' is not an audio pad, media type: '{media_type}'", flush=True)
logging.info(f"Pad '{pad.get_name()}' is not an audio pad, media type: '{media_type}'")
pad.set_active(False) # Deactivate the pad to ignore video
elements["source"].connect("pad-added", on_pad_added)
print(f"{time.ctime()} - Setting Twitch stream URL", flush=True)
stream_url = get_twitch_stream_url(USERNAME)
if stream_url:
elements["source"].set_property("uri", stream_url)
else:
print(f"{time.ctime()} - Failed to set Twitch stream URL", flush=True)
return
elements["sink"].set_property("emit-signals", True)
print(f"{time.ctime()} - Adding signal watch to GStreamer bus", flush=True)
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message::error", on_error, pipeline)
def on_state_changed(bus, msg):
if msg.type == Gst.MessageType.STATE_CHANGED:
old_state, new_state, pending_state = msg.parse_state_changed()
print(f"{time.ctime()} - Pipeline state changed from {old_state} to {new_state} (pending: {pending_state})", flush=True)
logging.info(f"Pipeline state changed from {old_state} to {new_state} (pending: {pending_state})")
if new_state == Gst.State.PLAYING:
print(f"{time.ctime()} - Pipeline is now in PLAYING state", flush=True)
bus.connect("message::state-changed", on_state_changed)
def on_element_message(bus, msg):
if msg.type == Gst.MessageType.ELEMENT:
structure = msg.get_structure()
if structure:
logging.info(f"Element message: {structure.to_string()}")
print(f"{time.ctime()} - Element message: {structure.to_string()}", flush=True)
bus.connect("message::element", on_element_message)
def on_buffering(bus, msg):
if msg.type == Gst.MessageType.BUFFERING:
percent = msg.parse_buffering()
logging.info(f"Buffering {percent}%")
print(f"{time.ctime()} - Buffering {percent}%", flush=True)
bus.connect("message::buffering", on_buffering)
print(f"{time.ctime()} - Setting GStreamer pipeline to PLAYING state", flush=True)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print(f"{time.ctime()} - Failed to set pipeline to PLAYING state", flush=True)
# Capture detailed error messages
bus.poll(Gst.MessageType.ERROR, Gst.CLOCK_TIME_NONE)
return
elif ret == Gst.StateChangeReturn.ASYNC:
print(f"{time.ctime()} - Pipeline state change is asynchronous", flush=True)
# Wait for the state change to complete
state, current, pending = pipeline.get_state(Gst.CLOCK_TIME_NONE)
print(f"{time.ctime()} - Pipeline state: {current}, Pending state: {pending}", flush=True)
if current != Gst.State.PLAYING:
print(f"{time.ctime()} - Pipeline failed to reach PLAYING state", flush=True)
return
print(f"{time.ctime()} - Pipeline is now in PLAYING state", flush=True)
audio_data = bytearray()
print(f"{time.ctime()} - Loading emotion recognition model", flush=True)
try:
model = AutoModel(model="iic/emotion2vec_plus_large")
except torch.cuda.CudaError as e:
print(f"{time.ctime()} - CUDA error while loading model: {e}", flush=True)
return
except Exception as e:
print(f"{time.ctime()} - Failed to load emotion recognition model: {e}", flush=True)
return
last_sample_time = None
def on_new_sample(sink):
nonlocal audio_data, last_sample_time
print(f"{time.ctime()} - New audio sample received", flush=True)
try:
start_time = time.time()
sample = sink.emit("pull-sample")
if not sample:
print(f"{time.ctime()} - No sample received", flush=True)
return Gst.FlowReturn.ERROR
buf = sample.get_buffer()
data = buf.extract_dup(0, buf.get_size())
audio_data.extend(data)
current_time = time.time()
if last_sample_time is not None:
interval = current_time - last_sample_time
print(f"Time between samples: {interval:.6f} seconds", flush=True)
last_sample_time = current_time
except Exception as e:
print(f"{time.ctime()} - Error extracting audio sample: {e}", flush=True)
return Gst.FlowReturn.ERROR
if len(audio_data) >= RATE * 5:
print(f"{time.ctime()} - Processing 5 seconds of audio data", flush=True)
try:
audio_array = np.frombuffer(audio_data, dtype=np.float32)
audio_array = ensure_finite(audio_array)
audio_array = normalize_audio(audio_array)
predictions = perform_emotion_recognition(audio_array, model)
if predictions:
print(f"{time.ctime()} - Emotion predictions: {predictions}", flush=True)
else:
print(f"{time.ctime()} - Failed to get emotion predictions", flush=True)
audio_data = bytearray()
except Exception as e:
print(f"{time.ctime()} - Error processing audio data: {e}", flush=True)
return Gst.FlowReturn.ERROR
return Gst.FlowReturn.OK
print(f"{time.ctime()} - Connecting new-sample signal to on_new_sample function", flush=True)
elements["sink"].connect("new-sample", on_new_sample)
print(f"{time.ctime()} - Starting GLib MainLoop", flush=True)
loop = GLib.MainLoop()
try:
loop.run()
except KeyboardInterrupt:
print(f"{time.ctime()} - KeyboardInterrupt received, stopping GStreamer pipeline", flush=True)
finally:
print(f"{time.ctime()} - Setting GStreamer pipeline to NULL state", flush=True)
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
print(f"{time.ctime()} - Starting script", flush=True)
try:
stream_audio_from_twitch()
except KeyboardInterrupt:
print(f"{time.ctime()} - Script interrupted by user", flush=True)
except Exception as e:
print(f"{time.ctime()} - Unexpected error: {e}", flush=True)
print(f"{time.ctime()} - Script finished", flush=True)type here
I have tried mostly focusing on hte gstreamer pipeline as i have never used gstreamer before but it could be possible that the problem lies elsewhere.
user24234264 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.