Combining threading and asyncio to process audio stream through a WebSocket connection

Overview

I have a server which has an an open WebSocket connection with one client application. This client application (android app) can send live microphone audio data. What the server has to do in response to receiving this data is to reply with partial transcriptions so that the user can see what they are saying be transcribed in real time. I use Google Speech-to-text API for this.

I am also well aware that android has a built in speech recogniser which achieves exactly this.

The server is launched using asyncio.run and incoming data is passed to handlers which all use asynchronous methods. These are the methods that are given the responsibility of handling the reception of an audio frame:

elif action == util.ActionMessages.AUDIO_FRAME:
    audio_id, audio = content["id"], content["audio"]
    await self._audio_handler.receive_audio(audio, audio_id)


# Audio handler method
class AudioHandler:
    def __init__(self, client_handler: ClientHandler):
        self._client_handler = client_handler


        self._audio_finished = dict()

        self._is_streaming = False
        self._audio_queue = queue.Queue()
        self._languages = "en-US"

        self._speech_client = speech.SpeechClient()
        config = speech.RecognitionConfig(...)
        self._streaming_config = speech.StreamingRecognitionConfig(...)

        self._executor = ThreadPoolExecutor(max_workers=1)
        self._request_built = False


    async def receive_audio(self, content: str | None, audio_id: str):

        is_audio_complete = self._audio_finished.setdefault(audio_id, False)
        if content and not is_audio_complete:
            self._is_streaming = True
            content = base64.b64decode(content)
            self._audio_queue.put(content)

            if not self._request_built:
                future = self._executor.submit(self._build_requests)
                future.add_done_callback(lambda f: self._on_audio_processing_complete(f, audio_id))
                self._request_built = True

        elif is_audio_complete:
            # TODO: Implement audio processing complete like clean up dictionary
            pass

        else:
            self._request_built = False
            self._is_streaming = False
            self._audio_queue.put(None)



    def _on_audio_processing_complete(self, future, audio_id):
        self._audio_finished[audio_id] = True
        self._request_built = False

    def _read_audio(self):
        while self._is_streaming:
            chunk = self._audio_queue.get()
            if chunk is None:
                return
            data = [chunk]

            while True:
                try:
                    chunk = self._audio_queue.get_nowait()
                    if chunk is None:
                        return
                    data.append(chunk)
                except queue.Empty:
                    break

            yield b"".join(data)

    def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Send transcript clients
            print(transcript + overwrite_chars)

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

    def _build_requests(self):
        audio_generator = self._read_audio()
        requests = (
            speech.StreamingRecognizeRequest(audio_content=content)
            for content in audio_generator
        )

        responses = self._speech_client.streaming_recognize(self._streaming_config, requests)
        self._listen_print_loop(responses)

When audio comes in with the utterance id (audio id) it fills up a queue. On first arrival a new thread is launched which instantiates a generator which reads audio samples from the queue and converts them to the appropriate type. The google speech client uses this generator to perform the transcription. This speech client returns a responses generator which is the used by the _listen_print_loop method to (for now) print the responses/transcriptions.

The logic for using the Google speech api is largely based on their docs.

The issue

As you can imagine, printing the transcription server side is not what I want. I would like to send these partial transcriptions to my client application. However, the method I use to send messages through the socket is async and thus in this implementation cannot be sent from the _listen_print_loop method as it itself is not async. Here is what I mean:

def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Cannot do this!
            await send_to_client(transcript + overwrite_chars)

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

I would like to know what the best solution for this is. Is it switching from using threading to only asyncio? If so, would that not mean I would have to implement an async generator function? Would that not cause issues with the speech client?

I am relatively new to asyncio, any pointers would be greatly appreciated!

Edit: Using asyncio.run_coroutine_threadsafe()

I have tried the following to no avail:

def _listen_print_loop(self, responses):
        num_chars_printed = 0
        for response in responses:
            if not response.results:
                continue

            result = response.results[0]
            if not result.alternatives:
                continue

            transcript = result.alternatives[0].transcript

            overwrite_chars = " " * (num_chars_printed - len(transcript))

            # Send on current even loop passed to client handler
            print(transcript + overwrite_chars)
            asyncio.run_coroutine_threadsafe(
                send_to_clients(transcript + overwrite_chars),
                self._client_handler.loop,
            )

            if not result.is_final:
                num_chars_printed = len(transcript)

            else:
                self._is_streaming = False
                return

In this change I use asyncio’s asyncio.run_coroutine_threadsafe() to run the coroutine send_to_clients. The loop variable is set on launch as follows:

async def launch_server():
    # For threads
    client_handler.loop = asyncio.get_running_loop()

    ip_address = "0.0.0.0"
    port = int(os.getenv("PORT"))
    server = await websockets.serve(
        websocket_server,
        ip_address,
        port,
        process_request=health_check
    )
   
    await asyncio.shield(server.wait_closed())


if __name__ == "__main__":
    asyncio.run(launch_server())

This solution does not work as when it reaches the line to send the transcript the WebSocket closes with the following exception:

WebSocket connection closed: no close frame received or sent.

Edit: Using speech.SpeechAsyncClient

I have discovered the the speech module offers the speech.SpeechAsyncClient (link) which I have used as follows:

async def receive_audio(self, content: str | None, audio_id: str):
    is_audio_complete = self._audio_finished.setdefault(audio_id, False)
    if content and not is_audio_complete:
        self._is_streaming = True
        content = base64.b64decode(content)
        await self._audio_queue.put(content)

        if not self._request_built:
            self._request_built = True
            await self._build_requests()

    elif is_audio_complete:
        pass

    else:
        self._request_built = False
        self._is_streaming = False
        await self._audio_queue.put(None)

async def _read_audio(self):
    print("Reading audio")

    config_request = speech.StreamingRecognizeRequest()
    config_request.streaming_config = self._streaming_config
    yield config_request

    while self._is_streaming:
        chunk = await self._audio_queue.get()
        if chunk is None:
            return
        data = [chunk]

        while True:
            try:
                chunk = await self._audio_queue.get_nowait()
                if chunk is None:
                    return
                data.append(chunk)
            except queue.Empty:
                break

        request = speech.StreamingRecognizeRequest()
        request.audio_content = b"".join(data)
        yield request

async def _build_requests(self):
    audio_generator = self._read_audio()
    responses = await self._speech_client.streaming_recognize(
        requests=audio_generator,
    )
    print("Listening for audio")
    await self._listen_print_loop(responses)

This does not cause any error, however, for some reason the program hangs when await the streaming_recognize(...) method. More specifically, the generator _read_audio() is never called meaning no audio is ever processed.

Edit 2.1: Forgot to mention that asyncio.Queue is being used here

Edit 2.2: I have implemented the feature in a scratch file using this method (and my microphone directly) and it works. The issue here is still that the generator is never called (print(“Reading audio”) never reached. This leads me to believe it is the way I am handling asyncio.

Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa Dịch vụ tổ chức sự kiện 5 sao Thông tin về chúng tôi Dịch vụ sinh nhật bé trai Dịch vụ sinh nhật bé gái Sự kiện trọn gói Các tiết mục giải trí Dịch vụ bổ trợ Tiệc cưới sang trọng Dịch vụ khai trương Tư vấn tổ chức sự kiện Hình ảnh sự kiện Cập nhật tin tức Liên hệ ngay Thuê chú hề chuyên nghiệp Tiệc tất niên cho công ty Trang trí tiệc cuối năm Tiệc tất niên độc đáo Sinh nhật bé Hải Đăng Sinh nhật đáng yêu bé Khánh Vân Sinh nhật sang trọng Bích Ngân Tiệc sinh nhật bé Thanh Trang Dịch vụ ông già Noel Xiếc thú vui nhộn Biểu diễn xiếc quay đĩa Dịch vụ tổ chức tiệc uy tín Khám phá dịch vụ của chúng tôi Tiệc sinh nhật cho bé trai Trang trí tiệc cho bé gái Gói sự kiện chuyên nghiệp Chương trình giải trí hấp dẫn Dịch vụ hỗ trợ sự kiện Trang trí tiệc cưới đẹp Khởi đầu thành công với khai trương Chuyên gia tư vấn sự kiện Xem ảnh các sự kiện đẹp Tin mới về sự kiện Kết nối với đội ngũ chuyên gia Chú hề vui nhộn cho tiệc sinh nhật Ý tưởng tiệc cuối năm Tất niên độc đáo Trang trí tiệc hiện đại Tổ chức sinh nhật cho Hải Đăng Sinh nhật độc quyền Khánh Vân Phong cách tiệc Bích Ngân Trang trí tiệc bé Thanh Trang Thuê dịch vụ ông già Noel chuyên nghiệp Xem xiếc khỉ đặc sắc Xiếc quay đĩa thú vị
Trang chủ Giới thiệu Sinh nhật bé trai Sinh nhật bé gái Tổ chức sự kiện Biểu diễn giải trí Dịch vụ khác Trang trí tiệc cưới Tổ chức khai trương Tư vấn dịch vụ Thư viện ảnh Tin tức - sự kiện Liên hệ Chú hề sinh nhật Trang trí YEAR END PARTY công ty Trang trí tất niên cuối năm Trang trí tất niên xu hướng mới nhất Trang trí sinh nhật bé trai Hải Đăng Trang trí sinh nhật bé Khánh Vân Trang trí sinh nhật Bích Ngân Trang trí sinh nhật bé Thanh Trang Thuê ông già Noel phát quà Biểu diễn xiếc khỉ Xiếc quay đĩa
Thiết kế website Thiết kế website Thiết kế website Cách kháng tài khoản quảng cáo Mua bán Fanpage Facebook Dịch vụ SEO Tổ chức sinh nhật