Audio Speach Emotion Recognition Pipeline either freezing or crashing due to broken pipe

I’ve tried to rewrit this early draft of hte scirpt multiple times and end up with the scirpt either freezing or crashing due to a broken pipe.

currently with this implementation it is freezing at this point in the log statements before the scirpt no longer runs.

full script and log statements attatched below.

Tue Aug 6 20:53:07 2024 – Setting up CUDA environment variable for GPU processing
Tue Aug 6 20:53:07 2024 – Setting GST_PLUGIN_PATH to /usr/lib/x86_64-linux-gnu/gstreamer-1.0
Tue Aug 6 20:53:07 2024 – Starting script
Tue Aug 6 20:53:07 2024 – Initializing GStreamer
Tue Aug 6 20:53:10 2024 – Enabled detailed GStreamer debug output
Tue Aug 6 20:53:10 2024 – Set GST_DEBUG_DUMP_DOT_DIR to /home/ubuntu/dot
Tue Aug 6 20:53:10 2024 – GStreamer pipeline element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer source element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer audiorate element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer convert element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer resample element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer sink element created successfully
Tue Aug 6 20:53:10 2024 – GStreamer queue element created successfully
Tue Aug 6 20:53:10 2024 – Adding elements to GStreamer pipeline
Tue Aug 6 20:53:10 2024 – Successfully added source to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added queue to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added audiorate to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added convert to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added resample to the pipeline
Tue Aug 6 20:53:10 2024 – Successfully added sink to the pipeline
Tue Aug 6 20:53:10 2024 – Setting Twitch stream URL
Tue Aug 6 20:53:10 2024 – Adding signal watch to GStreamer bus
Tue Aug 6 20:53:10 2024 – Setting GStreamer pipeline to PLAYING state
Tue Aug 6 20:53:10 2024 – Pipeline state change is asynchronous
Tue Aug 6 20:53:12 2024 – New pad ‘src_0’ added to ‘source’
Tue Aug 6 20:53:12 2024 – Pad ‘src_0’ is not an audio pad, media type: ‘video/x-raw, width=(int)[ 48, 4096 ], height=(int)[ 16, 4096 ], framerate=(fraction)[ 0/1, 2147483647/1 ], format=(string){ NV12 }; video/x-raw(memory:GLMemory), width=(int)[ 48, 4096 ], height=(int)[ 16, 4096 ], framerate=(fraction)[ 0/1, 2147483647/1 ], format=(string){ NV12 }; video/x-raw(memory:CUDAMemory), width=(int)[ 48, 4096 ], height=(int)[ 16, 4096 ], framerate=(fraction)[ 0/1, 2147483647/1 ], format=(string){ NV12 }’
Tue Aug 6 20:53:12 2024 – New pad ‘src_1’ added to ‘source’
Tue Aug 6 20:53:12 2024 – Pad ‘src_1’ is an audio pad with media type ‘audio/x-raw, format=(string)S16LE, layout=(string)interleaved, rate=(int)[ 8000, 96000 ], channels=(int)[ 1, 8 ]’
Tue Aug 6 20:53:12 2024 – Successfully linked pad ‘src_1’ to audiorate sink pad

script freezes here

import subprocess
import numpy as np
import librosa
import logging
import os
import torch
from funasr import AutoModel
import signal
import time
import streamlink
import gi

# GStreamer setup
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

# Constants
RATE = 16000
CHUNK = 1024
USERNAME = 'twitch_streamer_username'
MAX_RETRIES = 5
CHECKPOINT_DIR = './checkpoints'

# Twitch API credentials
CLIENT_ID = 'xxxxxxxxxxxxx'
CLIENT_SECRET = 'xxxxxxxxxxxxxx'

# Logging configuration
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

def custom_sigpipe_handler(signum, frame):
    print(f"{time.ctime()} - SIGPIPE received: Broken pipe error", flush=True)
    logging.error("SIGPIPE received: Broken pipe error")

# Set up signal handling for SIGPIPE
signal.signal(signal.SIGPIPE, custom_sigpipe_handler)

# Ensure proper CUDA environment variable is set for GPU processing
os.environ["CUDA_VISIBLE_DEVICES"] = "0"
print(f"{time.ctime()} - Setting up CUDA environment variable for GPU processing", flush=True)

# GStreamer Plugin Pathway
plugin_path = "/usr/lib/x86_64-linux-gnu/gstreamer-1.0"
os.environ["GST_PLUGIN_PATH"] = plugin_path
print(f"{time.ctime()} - Setting GST_PLUGIN_PATH to {plugin_path}", flush=True)

# Custom SIGINT handler
def custom_sigint_handler(signum, frame):
    raise KeyboardInterrupt

signal.signal(signal.SIGINT, custom_sigint_handler)

# Function to get Twitch stream URL using Streamlink
def get_twitch_stream_url(username):
    try:
        session = streamlink.Streamlink()
        streams = session.streams(f"https://www.twitch.tv/{username}")
        if 'best' in streams:
            stream_url = streams['best'].url
            logging.info(f"Stream URL: {stream_url}")
            return stream_url
        else:
            logging.info("No playable streams found.")
            return None
    except Exception as e:
        logging.error(f"Error fetching Twitch stream URL: {e}")
        return None

# Error handling function for GStreamer
def on_error(bus, msg, pipeline):
    err, debug = msg.parse_error()
    print(f"{time.ctime()} - GStreamer error: {err}, {debug}", flush=True)
    logging.error(f"GStreamer error: {err}, {debug}")
    if "not-linked" in str(err):
        print(f"{time.ctime()} - Error: Element not linked properly", flush=True)
        logging.error("Element not linked properly")
    elif "capabilities" in str(err):
        print(f"{time.ctime()} - Error: Element capabilities issue", flush=True)
        logging.error("Element capabilities issue")
    pipeline.set_state(Gst.State.NULL)
    return

# Function to normalize audio
def normalize_audio(audio):
    print(f"{time.ctime()} - Normalizing audio", flush=True)
    try:
        return librosa.util.normalize(audio)
    except Exception as e:
        print(f"{time.ctime()} - Error normalizing audio: {e}", flush=True)
        return audio  # Return unnormalized audio in case of error

# Function to ensure finite values in audio buffer
def ensure_finite(audio):
    print(f"{time.ctime()} - Ensuring finite values in audio buffer", flush=True)
    try:
        return np.nan_to_num(audio)
    except Exception as e:
        print(f"{time.ctime()} - Error ensuring finite values: {e}", flush=True)
        return audio  # Return original audio in case of error

# Function to perform emotion recognition
def perform_emotion_recognition(audio_data, model):
    print(f"{time.ctime()} - Performing emotion recognition", flush=True)
    try:
        # Save the audio data to a temporary file
        temp_wav_file = "temp_audio.wav"
        librosa.output.write_wav(temp_wav_file, audio_data, RATE)
        # Perform emotion recognition using the funasr model
        res = model.generate(temp_wav_file, output_dir="./outputs", granularity="utterance", extract_embedding=False)
        return res
    except FileNotFoundError as e:
        print(f"{time.ctime()} - FileNotFoundError: {e}", flush=True)
    except librosa.util.exceptions.ParameterError as e:
        print(f"{time.ctime()} - librosa ParameterError: {e}", flush=True)
    except Exception as e:
        print(f"{time.ctime()} - Failed to perform emotion recognition: {e}", flush=True)
    return None

# Main function to stream and process audio
def stream_audio_from_twitch():
    print(f"{time.ctime()} - Initializing GStreamer", flush=True)
    Gst.init(None)

    # Enable detailed debug output
    os.environ["GST_DEBUG"] = "*:5"
    print(f"{time.ctime()} - Enabled detailed GStreamer debug output", flush=True)

    # Generate pipeline graphs
    os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/home/ubuntu/dot"
    print(f"{time.ctime()} - Set GST_DEBUG_DUMP_DOT_DIR to /home/ubuntu/dot", flush=True)

    # Create GStreamer elements
    elements = {
        "pipeline": Gst.Pipeline(),
        "source": Gst.ElementFactory.make("uridecodebin", "source"),
        "audiorate": Gst.ElementFactory.make("audiorate", "audiorate"),
        "convert": Gst.ElementFactory.make("audioconvert", "convert"),
        "resample": Gst.ElementFactory.make("audioresample", "resample"),
        "sink": Gst.ElementFactory.make("appsink", "sink"),
        "queue": Gst.ElementFactory.make("queue", "queue")
    }

    # Check each element and print specific error messages
    for name, element in elements.items():
        if not element:
            print(f"{time.ctime()} - Error: GStreamer {name} element could not be created", flush=True)
            logging.error(f"GStreamer {name} element could not be created.")
        else:
            print(f"{time.ctime()} - GStreamer {name} element created successfully", flush=True)
            logging.info(f"GStreamer {name} element created successfully")

    # If any element is missing, return early
    if not all(elements.values()):
        print(f"{time.ctime()} - Error: GStreamer elements could not be created", flush=True)
        logging.error("GStreamer elements could not be created.")
        return

    print(f"{time.ctime()} - Adding elements to GStreamer pipeline", flush=True)
    pipeline = elements["pipeline"]
    pipeline.set_state(Gst.State.NULL)  # Ensure pipeline is in NULL state

    for name in ["source", "queue", "audiorate", "convert", "resample", "sink"]:
        if pipeline.add(elements[name]):
            print(f"{time.ctime()} - Successfully added {name} to the pipeline", flush=True)
            logging.info(f"Successfully added {name} to the pipeline")
        else:
            print(f"{time.ctime()} - Failed to add {name} to the pipeline", flush=True)
            logging.error(f"Failed to add {name} to the pipeline")
            return  # Exit if any element fails to add

    # Correct handling of dynamic pads
    def on_pad_added(src, pad):
        print(f"{time.ctime()} - New pad '{pad.get_name()}' added to '{src.get_name()}'", flush=True)
        caps = pad.query_caps(None)
        media_type = caps.to_string()
        if "audio" in media_type:
            print(f"{time.ctime()} - Pad '{pad.get_name()}' is an audio pad with media type '{media_type}'", flush=True)
            sink_pad = elements["audiorate"].get_static_pad("sink")
            if pad.link(sink_pad) != Gst.PadLinkReturn.OK:
                print(f"{time.ctime()} - Failed to link pad '{pad.get_name()}' to audiorate sink pad", flush=True)
                logging.error(f"Failed to link pad '{pad.get_name()}' to audiorate sink pad")
            else:
                print(f"{time.ctime()} - Successfully linked pad '{pad.get_name()}' to audiorate sink pad", flush=True)
        else:
            print(f"{time.ctime()} - Pad '{pad.get_name()}' is not an audio pad, media type: '{media_type}'", flush=True)
            logging.info(f"Pad '{pad.get_name()}' is not an audio pad, media type: '{media_type}'")
            pad.set_active(False)  # Deactivate the pad to ignore video

    elements["source"].connect("pad-added", on_pad_added)

    print(f"{time.ctime()} - Setting Twitch stream URL", flush=True)
    stream_url = get_twitch_stream_url(USERNAME)
    if stream_url:
        elements["source"].set_property("uri", stream_url)
    else:
        print(f"{time.ctime()} - Failed to set Twitch stream URL", flush=True)
        return

    elements["sink"].set_property("emit-signals", True)

    print(f"{time.ctime()} - Adding signal watch to GStreamer bus", flush=True)
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message::error", on_error, pipeline)

    def on_state_changed(bus, msg):
        if msg.type == Gst.MessageType.STATE_CHANGED:
            old_state, new_state, pending_state = msg.parse_state_changed()
            print(f"{time.ctime()} - Pipeline state changed from {old_state} to {new_state} (pending: {pending_state})", flush=True)
            logging.info(f"Pipeline state changed from {old_state} to {new_state} (pending: {pending_state})")
            if new_state == Gst.State.PLAYING:
                print(f"{time.ctime()} - Pipeline is now in PLAYING state", flush=True)

    bus.connect("message::state-changed", on_state_changed)

    def on_element_message(bus, msg):
        if msg.type == Gst.MessageType.ELEMENT:
            structure = msg.get_structure()
            if structure:
                logging.info(f"Element message: {structure.to_string()}")
                print(f"{time.ctime()} - Element message: {structure.to_string()}", flush=True)

    bus.connect("message::element", on_element_message)

    def on_buffering(bus, msg):
        if msg.type == Gst.MessageType.BUFFERING:
            percent = msg.parse_buffering()
            logging.info(f"Buffering {percent}%")
            print(f"{time.ctime()} - Buffering {percent}%", flush=True)

    bus.connect("message::buffering", on_buffering)

    print(f"{time.ctime()} - Setting GStreamer pipeline to PLAYING state", flush=True)
    ret = pipeline.set_state(Gst.State.PLAYING)
    if ret == Gst.StateChangeReturn.FAILURE:
        print(f"{time.ctime()} - Failed to set pipeline to PLAYING state", flush=True)
        # Capture detailed error messages
        bus.poll(Gst.MessageType.ERROR, Gst.CLOCK_TIME_NONE)
        return
    elif ret == Gst.StateChangeReturn.ASYNC:
        print(f"{time.ctime()} - Pipeline state change is asynchronous", flush=True)
        # Wait for the state change to complete
        state, current, pending = pipeline.get_state(Gst.CLOCK_TIME_NONE)
        print(f"{time.ctime()} - Pipeline state: {current}, Pending state: {pending}", flush=True)
        if current != Gst.State.PLAYING:
            print(f"{time.ctime()} - Pipeline failed to reach PLAYING state", flush=True)
            return
    print(f"{time.ctime()} - Pipeline is now in PLAYING state", flush=True)

    audio_data = bytearray()

    print(f"{time.ctime()} - Loading emotion recognition model", flush=True)
    try:
        model = AutoModel(model="iic/emotion2vec_plus_large")
    except torch.cuda.CudaError as e:
        print(f"{time.ctime()} - CUDA error while loading model: {e}", flush=True)
        return
    except Exception as e:
        print(f"{time.ctime()} - Failed to load emotion recognition model: {e}", flush=True)
        return

    last_sample_time = None

    def on_new_sample(sink):
        nonlocal audio_data, last_sample_time
        print(f"{time.ctime()} - New audio sample received", flush=True)
        try:
            start_time = time.time()
            sample = sink.emit("pull-sample")
            if not sample:
                print(f"{time.ctime()} - No sample received", flush=True)
                return Gst.FlowReturn.ERROR

            buf = sample.get_buffer()
            data = buf.extract_dup(0, buf.get_size())
            audio_data.extend(data)

            current_time = time.time()
            if last_sample_time is not None:
                interval = current_time - last_sample_time
                print(f"Time between samples: {interval:.6f} seconds", flush=True)
            last_sample_time = current_time

        except Exception as e:
            print(f"{time.ctime()} - Error extracting audio sample: {e}", flush=True)
            return Gst.FlowReturn.ERROR

        if len(audio_data) >= RATE * 5:
            print(f"{time.ctime()} - Processing 5 seconds of audio data", flush=True)
            try:
                audio_array = np.frombuffer(audio_data, dtype=np.float32)
                audio_array = ensure_finite(audio_array)
                audio_array = normalize_audio(audio_array)
                predictions = perform_emotion_recognition(audio_array, model)
                if predictions:
                    print(f"{time.ctime()} - Emotion predictions: {predictions}", flush=True)
                else:
                    print(f"{time.ctime()} - Failed to get emotion predictions", flush=True)
                audio_data = bytearray()
            except Exception as e:
                print(f"{time.ctime()} - Error processing audio data: {e}", flush=True)
                return Gst.FlowReturn.ERROR

        return Gst.FlowReturn.OK

    print(f"{time.ctime()} - Connecting new-sample signal to on_new_sample function", flush=True)
    elements["sink"].connect("new-sample", on_new_sample)

    print(f"{time.ctime()} - Starting GLib MainLoop", flush=True)
    loop = GLib.MainLoop()
    try:
        loop.run()
    except KeyboardInterrupt:
        print(f"{time.ctime()} - KeyboardInterrupt received, stopping GStreamer pipeline", flush=True)
    finally:
        print(f"{time.ctime()} - Setting GStreamer pipeline to NULL state", flush=True)
        pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    print(f"{time.ctime()} - Starting script", flush=True)
    try:
        stream_audio_from_twitch()
    except KeyboardInterrupt:
        print(f"{time.ctime()} - Script interrupted by user", flush=True)
    except Exception as e:
        print(f"{time.ctime()} - Unexpected error: {e}", flush=True)
    print(f"{time.ctime()} - Script finished", flush=True)type here

I have tried mostly focusing on hte gstreamer pipeline as i have never used gstreamer before but it could be possible that the problem lies elsewhere.

New contributor

user24234264 is a new contributor to this site. Take care in asking for clarification, commenting, and answering.
Check out our Code of Conduct.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật