diff --git a/README.md b/README.md index 36ab957e..0ee92efc 100644 --- a/README.md +++ b/README.md @@ -411,9 +411,11 @@ communication_services: resource_id: xxx sms_queue_name: sms-33612345678 +# Must be of type "AI services multi-service account" cognitive_service: - # Must be of type "AI services multi-service account" endpoint: https://xxx.cognitiveservices.azure.com + region: swedencentral + resource_id: xxx llm: fast: @@ -632,10 +634,11 @@ Conversation options are represented as features. They can be configured from Ap | `answer_hard_timeout_sec` | The hard timeout for the bot answer in seconds. | `int` | 180 | | `answer_soft_timeout_sec` | The soft timeout for the bot answer in seconds. | `int` | 30 | | `callback_timeout_hour` | The timeout for a callback in hours. | `int` | 3 | -| `phone_silence_timeout_sec` | The timeout for phone silence in seconds. | `int` | 1 | +| `recognition_retry_max` | The maximum number of retries for voice recognition. | `int` | 2 | | `recording_enabled` | Whether call recording is enabled. | `bool` | false | -| `slow_llm_for_chat` | Whether to use the slower LLM for chat. | `bool` | true | -| `voice_recognition_retry_max` | The maximum number of retries for voice recognition. | `int` | 2 | +| `slow_llm_for_chat` | Whether to use the slow LLM for chat. | `bool` | false | +| `vad_silence_timeout_ms` | The timeout for phone silence in seconds. | `int` | 500 | +| `vad_threshold` | The threshold for voice activity detection. | `float` | 0.5 | ### Use an OpenAI compatible model for the LLM diff --git a/app/helpers/call_events.py b/app/helpers/call_events.py index 5e603c5d..9b27c87c 100644 --- a/app/helpers/call_events.py +++ b/app/helpers/call_events.py @@ -4,6 +4,10 @@ from azure.communication.callautomation import ( AzureBlobContainerRecordingStorage, DtmfTone, + MediaStreamingAudioChannelType, + MediaStreamingContentType, + MediaStreamingOptions, + MediaStreamingTransportType, RecognitionChoice, RecordingChannel, RecordingContent, @@ -16,15 +20,14 @@ from app.helpers.call_llm import load_llm_chat from app.helpers.call_utils import ( ContextEnum as CallContextEnum, - handle_clear_queue, handle_hangup, handle_play_text, handle_recognize_ivr, - handle_recognize_text, handle_transfer, + start_audio_streaming, ) from app.helpers.config import CONFIG -from app.helpers.features import recording_enabled, voice_recognition_retry_max +from app.helpers.features import recognition_retry_max, recording_enabled from app.helpers.llm_worker import completion_sync from app.helpers.logging import logger from app.helpers.monitoring import CallAttributes, span_attribute, tracer @@ -50,14 +53,24 @@ async def on_new_call( client: CallAutomationClient, incoming_context: str, phone_number: str, + wss_url: str, ) -> bool: logger.debug("Incoming call handler caller ID: %s", phone_number) + streaming_options = MediaStreamingOptions( + audio_channel_type=MediaStreamingAudioChannelType.UNMIXED, + content_type=MediaStreamingContentType.AUDIO, + start_media_streaming=False, + transport_type=MediaStreamingTransportType.WEBSOCKET, + transport_url=wss_url, + ) + try: answer_call_result = await client.answer_call( callback_url=callback_url, cognitive_services_endpoint=CONFIG.cognitive_service.endpoint, incoming_call_context=incoming_context, + media_streaming=streaming_options, ) logger.info( "Answered call with %s (%s)", @@ -89,9 +102,7 @@ async def on_new_call( async def on_call_connected( call: CallStateModel, client: CallAutomationClient, - post_callback: Callable[[CallStateModel], Awaitable[None]], server_call_id: str, - training_callback: Callable[[CallStateModel], Awaitable[None]], ) -> None: logger.info("Call connected, asking for language") call.recognition_retry = 0 # Reset recognition retry counter @@ -106,8 +117,6 @@ async def on_call_connected( _handle_ivr_language( call=call, client=client, - post_callback=post_callback, - training_callback=training_callback, ), # First, every time a call is answered, confirm the language _db.call_aset( call @@ -134,65 +143,49 @@ async def on_call_disconnected( ) -@tracer.start_as_current_span("on_speech_recognized") -async def on_speech_recognized( +@tracer.start_as_current_span("on_audio_connected") +async def on_audio_connected( # noqa: PLR0913 + audio_bits_per_sample: int, + audio_channels: int, + audio_sample_rate: int, + audio_stream: asyncio.Queue[bytes], call: CallStateModel, client: CallAutomationClient, post_callback: Callable[[CallStateModel], Awaitable[None]], - text: str, training_callback: Callable[[CallStateModel], Awaitable[None]], ) -> None: - logger.info("Voice recognition: %s", text) - span_attribute(CallAttributes.CALL_CHANNEL, "voice") - span_attribute(CallAttributes.CALL_MESSAGE, text) - call.messages.append( - MessageModel( - content=text, - persona=MessagePersonaEnum.HUMAN, - ) + await load_llm_chat( + audio_bits_per_sample=audio_bits_per_sample, + audio_channels=audio_channels, + audio_sample_rate=audio_sample_rate, + audio_stream=audio_stream, + automation_client=client, + call=call, + post_callback=post_callback, + training_callback=training_callback, ) - call.recognition_retry = 0 # Reset recognition retry counter - await asyncio.gather( - handle_clear_queue( - call=call, - client=client, - ), # First, when the user speak, the conversation should continue based on its last message - load_llm_chat( - call=call, - client=client, - post_callback=post_callback, - training_callback=training_callback, - ), # Second, the LLM should be loaded to continue the conversation - _db.call_aset( - call - ), # Third, save in DB allowing SMS responses to be more "in-sync" if they are sent during the generation - ) # All in parallel to lower the response latency @tracer.start_as_current_span("on_recognize_timeout_error") -async def on_recognize_timeout_error( +async def on_recognize_error( call: CallStateModel, client: CallAutomationClient, contexts: set[CallContextEnum] | None, - post_callback: Callable[[CallStateModel], Awaitable[None]], - training_callback: Callable[[CallStateModel], Awaitable[None]], ) -> None: if ( contexts and CallContextEnum.IVR_LANG_SELECT in contexts ): # Retry IVR recognition span_attribute(CallAttributes.CALL_CHANNEL, "ivr") - if call.recognition_retry < await voice_recognition_retry_max(): + if call.recognition_retry < await recognition_retry_max(): call.recognition_retry += 1 logger.info( "Timeout, retrying language selection (%s/%s)", call.recognition_retry, - await voice_recognition_retry_max(), + await recognition_retry_max(), ) await _handle_ivr_language( call=call, client=client, - post_callback=post_callback, - training_callback=training_callback, ) else: # IVR retries are exhausted, end call logger.info("Timeout, ending call") @@ -203,7 +196,7 @@ async def on_recognize_timeout_error( return if ( - call.recognition_retry >= await voice_recognition_retry_max() + call.recognition_retry >= await recognition_retry_max() ): # Voice retries are exhausted, end call logger.info("Timeout, ending call") await _handle_goodbye( @@ -212,23 +205,6 @@ async def on_recognize_timeout_error( ) return - # Retry voice recognition - span_attribute(CallAttributes.CALL_CHANNEL, "voice") - call.recognition_retry += 1 - logger.info( - "Timeout, retrying voice recognition (%s/%s)", - call.recognition_retry, - await voice_recognition_retry_max(), - ) - # Never store the warning message in the call history, it has caused hallucinations in the LLM - await handle_recognize_text( - call=call, - client=client, - no_response_error=True, - store=False, - text=await CONFIG.prompts.tts.timeout_silence(call), - ) - async def _handle_goodbye( call: CallStateModel, @@ -243,33 +219,6 @@ async def _handle_goodbye( ) -@tracer.start_as_current_span("on_recognize_unknown_error") -async def on_recognize_unknown_error( - call: CallStateModel, - client: CallAutomationClient, - error_code: int, -) -> None: - span_attribute(CallAttributes.CALL_CHANNEL, "voice") - - if error_code == 8511: # noqa: PLR2004 - # Failure while trying to play the prompt - logger.warning("Failed to play prompt") - else: - logger.warning( - "Recognition failed with unknown error code %s, answering with default error", - error_code, - ) - - # Never store the error message in the call history, it has caused hallucinations in the LLM - await handle_recognize_text( - call=call, - client=client, - no_response_error=True, - store=False, - text=await CONFIG.prompts.tts.error(call), - ) - - @tracer.start_as_current_span("on_play_completed") async def on_play_completed( call: CallStateModel, @@ -308,22 +257,23 @@ async def on_play_error(error_code: int) -> None: logger.debug("Play failed") span_attribute(CallAttributes.CALL_CHANNEL, "voice") # See: https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/communication-services/how-tos/call-automation/play-action.md - if error_code == 8535: # noqa: PLR2004 - # Action failed, file format - logger.warning("Error during media play, file format is invalid") - elif error_code == 8536: # noqa: PLR2004 - # Action failed, file downloaded - logger.warning("Error during media play, file could not be downloaded") - elif error_code == 8565: # noqa: PLR2004 - # Action failed, AI services config - logger.error( - "Error during media play, impossible to connect with Azure AI services" - ) - elif error_code == 9999: # noqa: PLR2004 - # Unknown error code - logger.warning("Error during media play, unknown internal server error") - else: - logger.warning("Error during media play, unknown error code %s", error_code) + match error_code: + case 8535: + # Action failed, file format + logger.warning("Error during media play, file format is invalid") + case 8536: + # Action failed, file downloaded + logger.warning("Error during media play, file could not be downloaded") + case 8565: + # Action failed, AI services config + logger.error( + "Error during media play, impossible to connect with Azure AI services" + ) + case 9999: + # Unknown error code + logger.warning("Error during media play, unknown internal server error") + case _: + logger.warning("Error during media play, unknown error code %s", error_code) @tracer.start_as_current_span("on_ivr_recognized") @@ -331,8 +281,6 @@ async def on_ivr_recognized( call: CallStateModel, client: CallAutomationClient, label: str, - post_callback: Callable[[CallStateModel], Awaitable[None]], - training_callback: Callable[[CallStateModel], Awaitable[None]], ) -> None: logger.info("IVR recognized: %s", label) span_attribute(CallAttributes.CALL_CHANNEL, "ivr") @@ -353,35 +301,31 @@ async def on_ivr_recognized( if len(call.messages) <= 1: # First call, or only the call action await asyncio.gather( - handle_recognize_text( + handle_play_text( call=call, client=client, text=await CONFIG.prompts.tts.hello(call), ), # First, greet the user persist_coro, # Second, persist language change for next messages, should be quick enough to be in sync with the next message - load_llm_chat( + start_audio_streaming( call=call, client=client, - post_callback=post_callback, - training_callback=training_callback, - ), # Third, the LLM should be loaded to continue the conversation + ), # Third, the conversation with the LLM should start ) # All in parallel to lower the response latency else: # Returning call await asyncio.gather( - handle_recognize_text( + handle_play_text( call=call, client=client, style=MessageStyleEnum.CHEERFUL, text=await CONFIG.prompts.tts.welcome_back(call), ), # First, welcome back the user persist_coro, # Second, persist language change for next messages, should be quick enough to be in sync with the next message - load_llm_chat( + start_audio_streaming( call=call, client=client, - post_callback=post_callback, - training_callback=training_callback, - ), # Third, the LLM should be loaded to continue the conversation + ), # Third, the conversation with the LLM should start ) @@ -398,11 +342,10 @@ async def on_transfer_error( error_code: int, ) -> None: logger.info("Error during call transfer, subCode %s", error_code) - await handle_recognize_text( + await handle_play_text( call=call, client=client, context=CallContextEnum.TRANSFER_FAILED, - no_response_error=True, text=await CONFIG.prompts.tts.calltransfer_failure(call), ) @@ -410,10 +353,7 @@ async def on_transfer_error( @tracer.start_as_current_span("on_sms_received") async def on_sms_received( call: CallStateModel, - client: CallAutomationClient, message: str, - post_callback: Callable[[CallStateModel], Awaitable[None]], - training_callback: Callable[[CallStateModel], Awaitable[None]], ) -> bool: logger.info("SMS received from %s: %s", call.initiate.phone_number, message) span_attribute(CallAttributes.CALL_CHANNEL, "sms") @@ -430,12 +370,12 @@ async def on_sms_received( logger.info("Call not in progress, answering with SMS") else: logger.info("Call in progress, answering with voice") - await load_llm_chat( - call=call, - client=client, - post_callback=post_callback, - training_callback=training_callback, - ) + # TODO: Reimplement SMS answers in voice + # await load_llm_chat( + # call=call, + # client=client, + # post_callback=post_callback, + # ) return True @@ -501,7 +441,6 @@ def _validate(req: str | None) -> tuple[bool, str | None, str | None]: if not content: logger.warning("Error generating SMS report") return - logger.info("SMS report: %s", content) # Send the SMS to both the current caller and the policyholder success = False @@ -592,8 +531,6 @@ def _validate( async def _handle_ivr_language( call: CallStateModel, client: CallAutomationClient, - post_callback: Callable[[CallStateModel], Awaitable[None]], - training_callback: Callable[[CallStateModel], Awaitable[None]], ) -> None: # If only one language is available, skip the IVR if len(CONFIG.conversation.initiate.lang.availables) == 1: @@ -603,8 +540,6 @@ async def _handle_ivr_language( call=call, client=client, label=short_code, - post_callback=post_callback, - training_callback=training_callback, ) return diff --git a/app/helpers/call_llm.py b/app/helpers/call_llm.py index d9e2888c..8636a1a6 100644 --- a/app/helpers/call_llm.py +++ b/app/helpers/call_llm.py @@ -1,26 +1,41 @@ import asyncio -import time from collections.abc import Awaitable, Callable - +from functools import wraps + +import aiojobs +from azure.cognitiveservices.speech import ( + AudioConfig, + SpeechConfig, + SpeechRecognitionEventArgs, + SpeechRecognizer, +) +from azure.cognitiveservices.speech.audio import AudioStreamFormat, PushAudioInputStream from azure.communication.callautomation.aio import CallAutomationClient from openai import APIError +from pydub import AudioSegment from app.helpers.call_utils import ( handle_clear_queue, handle_media, - handle_recognize_text, + handle_play_text, tts_sentence_split, ) from app.helpers.config import CONFIG -from app.helpers.features import answer_hard_timeout_sec, answer_soft_timeout_sec -from app.helpers.llm_tools import LlmPlugins +from app.helpers.features import ( + answer_hard_timeout_sec, + answer_soft_timeout_sec, + vad_silence_timeout_ms, + vad_threshold, +) +from app.helpers.identity import token +from app.helpers.llm_tools import DefaultPlugin from app.helpers.llm_worker import ( MaximumTokensReachedError, SafetyCheckError, completion_stream, ) from app.helpers.logging import logger -from app.helpers.monitoring import tracer +from app.helpers.monitoring import CallAttributes, span_attribute, tracer from app.models.call import CallStateModel from app.models.message import ( ActionEnum as MessageAction, @@ -32,16 +47,156 @@ remove_message_action, ) -_cache = CONFIG.cache.instance() _db = CONFIG.database.instance() -# TODO: Refacto, this function is too long (and remove PLR0912/PLR0915 ignore) +# TODO: Refacto, this function is too long (and remove PLR0913 ignore) @tracer.start_as_current_span("call_load_llm_chat") -async def load_llm_chat( # noqa: PLR0912, PLR0915 +async def load_llm_chat( # noqa: PLR0913 + audio_bits_per_sample: int, + audio_channels: int, + audio_sample_rate: int, + audio_stream: asyncio.Queue[bytes], + automation_client: CallAutomationClient, + call: CallStateModel, + post_callback: Callable[[CallStateModel], Awaitable[None]], + training_callback: Callable[[CallStateModel], Awaitable[None]], +) -> None: + # Init language recognition + speech_token = await (await token("https://cognitiveservices.azure.com/.default"))() + recognizer_buffer: list[str] = [] + recognizer_lock = asyncio.Event() + recognizer_stream = PushAudioInputStream( + stream_format=AudioStreamFormat( + bits_per_sample=audio_bits_per_sample, + channels=audio_channels, + samples_per_second=audio_sample_rate, + ), + ) + recognizer_config = SpeechConfig( + auth_token=f"aad#{CONFIG.cognitive_service.resource_id}#{speech_token}", + region=CONFIG.cognitive_service.region, + ) + # recognizer_config.set_property(PropertyId.Speech_LogFilename, f"speech-{uuid4()}.log") + recognizer_client = SpeechRecognizer( + audio_config=AudioConfig(stream=recognizer_stream), + language=call.lang.short_code, + speech_config=recognizer_config, + ) + + def _handle_partial_recognition(event: SpeechRecognitionEventArgs) -> None: + text = event.result.text + # Skip if no text + if not text: + return + # Init buffer if empty + if not recognizer_buffer: + recognizer_buffer.append("") + # Replace last element by this update + recognizer_buffer[-1] = text + logger.debug("Partial recognition: %s", recognizer_buffer) + # Lock the recognition until the audio stream is ready + recognizer_lock.set() + + def _handle_complete_recognition(event: SpeechRecognitionEventArgs) -> None: + text = event.result.text + # Skip if no text + if not text: + return + # Replace last element by this update + recognizer_buffer[-1] = text + # Add a new element to the buffer, thus the next partial recognition will be in a new element + recognizer_buffer.append("") + logger.debug("Complete recognition: %s", recognizer_buffer) + + # Register callback and start recognition + recognizer_client.recognizing.connect(_handle_partial_recognition) + recognizer_client.recognized.connect(_handle_complete_recognition) + recognizer_client.session_started.connect( + lambda _: logger.debug("Recognition started") + ) + recognizer_client.session_stopped.connect( + lambda _: logger.debug("Recognition stopped") + ) + recognizer_client.canceled.connect( + lambda event: logger.warning("Recognition cancelled: %s", event) + ) + recognizer_client.start_continuous_recognition_async() + + # Build scheduler + last_response: aiojobs.Job | None = None + async with aiojobs.Scheduler() as scheduler: + + async def _clear_audio_callback() -> None: + # Wait for the recognition to be ready + await recognizer_lock.wait() + + # Clear the LLM queue + recognizer_buffer.clear() + + # Clear the TTS queue + await scheduler.spawn( + handle_clear_queue( + call=call, + client=automation_client, + ) + ) + + # Cancel the last response + if last_response: + # Wait 2 secs maximum for the task to end + await last_response.close(timeout=2) + + async def _response_callback() -> None: + # Wait for the recognition to be ready + await recognizer_lock.wait() + + # Skip if no recognition + if not recognizer_buffer or recognizer_buffer[-1] == "": + return + + # Add recognition to the call history + logger.info("Voice recognition: %s", recognizer_buffer) + call.messages.append( + MessageModel( + content=" ".join(recognizer_buffer), + persona=MessagePersonaEnum.HUMAN, + ) + ) + + # Add recognition to the call history + nonlocal last_response + last_response = await scheduler.spawn( + _out_answer( + call=call, + client=automation_client, + post_callback=post_callback, + scheduler=scheduler, + training_callback=training_callback, + ) + ) + + # Clear the LLM queue + recognizer_buffer.clear() + + await _in_audio( + bits_per_sample=audio_bits_per_sample, + channels=audio_channels, + clear_audio_callback=_clear_audio_callback, + in_stream=audio_stream, + out_stream=recognizer_stream, + response_callback=_response_callback, + sample_rate=audio_sample_rate, + ) + + +# TODO: Refacto, this function is too long (and remove PLR0912/PLR0915 ignore) +@tracer.start_as_current_span("call_load_out_answer") +async def _out_answer( # noqa: PLR0915 call: CallStateModel, client: CallAutomationClient, post_callback: Callable[[CallStateModel], Awaitable[None]], + scheduler: aiojobs.Scheduler, training_callback: Callable[[CallStateModel], Awaitable[None]], _iterations_remaining: int = 3, ) -> CallStateModel: @@ -52,8 +207,14 @@ async def load_llm_chat( # noqa: PLR0912, PLR0915 Returns the updated call model. """ - logger.info("Loading LLM chat") + # Add span attributes + span_attribute(CallAttributes.CALL_CHANNEL, "voice") + span_attribute(CallAttributes.CALL_MESSAGE, call.messages[-1].content) + # Reset recognition retry counter + call.recognition_retry = 0 + + # By default, play the loading sound play_loading_sound = True async def _tts_callback(text: str, style: MessageStyleEnum) -> None: @@ -61,35 +222,19 @@ async def _tts_callback(text: str, style: MessageStyleEnum) -> None: Send back the TTS to the user. """ nonlocal play_loading_sound - # For first TTS, interrupt loading sound and disable loading it - interrupt_queue = False if play_loading_sound: - interrupt_queue = True play_loading_sound = False - - await asyncio.gather( - handle_recognize_text( + # Play the TTS + await scheduler.spawn( + handle_play_text( call=call, client=client, - interrupt_queue=interrupt_queue, style=style, text=text, - ), # First, recognize the next voice - _db.call_aset( - call - ), # Second, save in DB allowing (1) user to cut off the Assistant and (2) SMS answers to be in order + ) ) - # Pointer - pointer_cache_key = f"{__name__}-load_llm_chat-pointer-{call.call_id}" - pointer_current = time.time() # Get system current time - await _cache.aset( - key=pointer_cache_key, - ttl_sec=await answer_hard_timeout_sec(), - value=str(pointer_current), - ) - # Chat chat_task = asyncio.create_task( _execute_llm_chat( @@ -129,16 +274,6 @@ def _clear_tasks() -> None: while True: logger.debug("Chat task status: %s", chat_task.done()) - if pointer_current < float( - (await _cache.aget(pointer_cache_key) or b"0").decode() - ): # Test if pointer updated by another instance - logger.warning("Another chat is running, stopping this one") - # Clean up Communication Services queue - await handle_clear_queue(call=call, client=client) - # Clean up tasks - _clear_tasks() - break - if chat_task.done(): # Break when chat coroutine is done # Clean up _clear_tasks() @@ -171,7 +306,7 @@ def _clear_tasks() -> None: ) soft_timeout_triggered = True # Never store the error message in the call history, it has caused hallucinations in the LLM - await handle_recognize_text( + await handle_play_text( call=call, client=client, store=False, @@ -202,32 +337,27 @@ def _clear_tasks() -> None: else: # Retry chat after an error logger.info("Retrying chat, %s remaining", _iterations_remaining - 1) - return await load_llm_chat( + return await _out_answer( call=call, client=client, post_callback=post_callback, + scheduler=scheduler, training_callback=training_callback, _iterations_remaining=_iterations_remaining - 1, ) - else: - if continue_chat and _iterations_remaining > 0: # Contiue chat - logger.info("Continuing chat, %s remaining", _iterations_remaining - 1) - return await load_llm_chat( - call=call, - client=client, - post_callback=post_callback, - training_callback=training_callback, - _iterations_remaining=_iterations_remaining - 1, - ) # Recursive chat (like for for retry or tools) - - # End chat - await handle_recognize_text( + elif continue_chat and _iterations_remaining > 0: # Contiue chat + logger.info("Continuing chat, %s remaining", _iterations_remaining - 1) + return await _out_answer( call=call, client=client, - no_response_error=True, - style=MessageStyleEnum.NONE, - text=None, - ) # Trigger an empty text to recognize and generate timeout error if user does not speak + post_callback=post_callback, + scheduler=scheduler, + training_callback=training_callback, + _iterations_remaining=_iterations_remaining - 1, + ) # Recursive chat (like for for retry or tools) + + # End chat + # TODO: Re-implement return call @@ -286,8 +416,14 @@ async def _content_callback( trainings=trainings, ) + # Initialize TTS callbacks + tts_callback = _tts_callback( + automation_client=client, + call=call, + ) + # Build plugins - plugins = LlmPlugins( + plugins = DefaultPlugin( call=call, client=client, post_callback=post_callback, @@ -298,7 +434,7 @@ async def _content_callback( if not use_tools: logger.warning("Tools disabled for this chat") else: - tools = await plugins.to_openai(call) + tools = await plugins.to_openai() logger.debug("Tools: %s", tools) # Execute LLM inference @@ -403,3 +539,126 @@ async def _content_callback( return False, True, call # TODO: Should we notify an error? return False, False, call # No error, no retry + + +# TODO: Refacto and simplify +async def _in_audio( # noqa: PLR0913 + bits_per_sample: int, + channels: int, + clear_audio_callback: Callable[[], Awaitable[None]], + in_stream: asyncio.Queue[bytes], + out_stream: PushAudioInputStream, + response_callback: Callable[[], Awaitable[None]], + sample_rate: int, +) -> None: + clear_tts_task: asyncio.Task | None = None + flush_task: asyncio.Task | None = None + + # Init VAD parameters + rms_threshold = await vad_threshold() + sample_width = bits_per_sample // 8 + silence_duration_ms = await vad_silence_timeout_ms() + + async def _flush_callback() -> None: + """ + Flush the audio buffer if no audio is detected for a while. + """ + nonlocal clear_tts_task + + # Wait for the timeout + await asyncio.sleep(silence_duration_ms / 1000) + + # Cancel the TTS clear task if any + if clear_tts_task: + clear_tts_task.cancel() + clear_tts_task = None + + logger.debug( + "Timeout triggered after %ims, flushing audio buffer", silence_duration_ms + ) + + # Commit the buffer + await response_callback() + + async def _clear_tts_callback() -> None: + """ + Clear the TTS queue. + + Start is the index of the buffer where the TTS was triggered. + """ + # Wait 200ms before clearing the TTS queue + await asyncio.sleep(0.2) + + logger.debug("Voice detected, cancelling TTS") + + # Clear the queue + await clear_audio_callback() + + # Consumes audio stream + while True: + # Wait for the next audio packet + in_chunck = await in_stream.get() + + # Load audio + in_audio: AudioSegment = AudioSegment( + channels=channels, + data=in_chunck, + frame_rate=sample_rate, + sample_width=sample_width, + ) + + # Confirm ASAP that the event is processed + in_stream.task_done() + + # Always add the audio to the buffer + assert isinstance(in_audio.raw_data, bytes) + out_stream.write(in_audio.raw_data) + + # Get the relative dB, silences shoudl be at 1 to 5% of the max, so 0.1 to 0.5 of the threshold + in_empty = False + if min(in_audio.rms / in_audio.max_possible_amplitude * 10, 1) < rms_threshold: + in_empty = True + # Start timeout if not already started and VAD already triggered + if not flush_task: + flush_task = asyncio.create_task(_flush_callback()) + + if in_empty: + # Continue to the next audio packet + continue + + # Voice detected, cancel the flush task if any + if flush_task: + flush_task.cancel() + flush_task = None + + # Start the TTS clear task + if not clear_tts_task: + clear_tts_task = asyncio.create_task(_clear_tts_callback()) + + +def _tts_callback( + automation_client: CallAutomationClient, + call: CallStateModel, +) -> Callable[[str, MessageStyleEnum], Awaitable[None]]: + """ + Send back the TTS to the user. + """ + + @wraps(_tts_callback) + async def wrapper( + text: str, + style: MessageStyleEnum, + ) -> None: + await asyncio.gather( + handle_play_text( + call=call, + client=automation_client, + style=style, + text=text, + ), # First, play the TTS to the user + _db.call_aset( + call + ), # Second, save in DB allowing (1) user to cut off the Assistant and (2) SMS answers to be in order + ) + + return wrapper diff --git a/app/helpers/call_utils.py b/app/helpers/call_utils.py index 84bed3e6..3b017279 100644 --- a/app/helpers/call_utils.py +++ b/app/helpers/call_utils.py @@ -11,6 +11,9 @@ RecognizeInputType, SsmlSource, ) +from azure.communication.callautomation._generated.models import ( + StartMediaStreamingRequest, +) from azure.communication.callautomation.aio import ( CallAutomationClient, CallConnectionClient, @@ -18,7 +21,6 @@ from azure.core.exceptions import HttpResponseError, ResourceNotFoundError from app.helpers.config import CONFIG -from app.helpers.features import phone_silence_timeout_sec from app.helpers.logging import logger from app.models.call import CallStateModel from app.models.message import ( @@ -83,51 +85,6 @@ def tts_sentence_split( ) -# TODO: Disable or lower profanity filter. The filter seems enabled by default, it replaces words like "holes in my roof" by "*** in my roof". This is not acceptable for a call center. -async def _handle_recognize_media( # noqa: PLR0913 - call: CallStateModel, - client: CallAutomationClient, - context: ContextEnum | None, - interrupt: bool, - style: MessageStyleEnum, - text: str | None, -) -> None: - """ - Play a media to a call participant and start recognizing the response. - - If `context` is provided, it will be used to track the operation. - """ - logger.info("Recognizing voice: %s", text) - try: - assert call.voice_id, "Voice ID is required for recognizing media" - async with _use_call_client(client, call.voice_id) as call_client: - await call_client.start_recognizing_media( - end_silence_timeout=await phone_silence_timeout_sec(), - input_type=RecognizeInputType.SPEECH, - interrupt_call_media_operation=interrupt, - interrupt_prompt=True, - operation_context=_context_builder({context}), - play_prompt=( - _audio_from_text( - call=call, - style=style, - text=text, - ) - if text - else None - ), # If no text is provided, only recognize - speech_language=call.lang.short_code, - target_participant=PhoneNumberIdentifier(call.initiate.phone_number), # pyright: ignore - ) - except ResourceNotFoundError: - logger.debug("Call hung up before recognizing") - except HttpResponseError as e: - if "call already terminated" in e.message.lower(): - logger.debug("Call hung up before playing") - else: - raise e - - async def _handle_play_text( call: CallStateModel, client: CallAutomationClient, @@ -188,69 +145,6 @@ async def handle_media( raise e -async def handle_recognize_text( # noqa: PLR0913 - call: CallStateModel, - client: CallAutomationClient, - text: str | None, - context: ContextEnum | None = None, - interrupt_queue: bool = False, - no_response_error: bool = False, - store: bool = True, - style: MessageStyleEnum = MessageStyleEnum.NONE, -) -> None: - """ - Play a text to a call participant and start recognizing the response. - - If `store` is `True`, the text will be stored in the call messages. Starts by playing text, then the "ready" sound, and finally starts recognizing the response. - """ - # Only recognize - if not text: - await _handle_recognize_media( - call=call, - client=client, - context=context, - interrupt=interrupt_queue, - style=style, - text=None, - ) - return - - # Split text in chunks - chunks = await _chunk_before_tts( - call=call, - store=store, - style=style, - text=text, - ) - - # Play each chunk - for i, chunk in enumerate(chunks): - # For first chunk, interrupt media if needed - chunck_interrupt_queue = interrupt_queue if i == 0 else False - - # For last chunk, recognize media to let user to interrupt bot - if i == len(chunks) - 1: - if no_response_error: - await _handle_recognize_media( - call=call, - client=client, - context=context, - interrupt=chunck_interrupt_queue, - style=style, - text=chunk, - ) - return - - # For other chunks, play text - await _handle_play_text( - call=call, - client=client, - context=context, - style=style, - text=chunk, - ) - - async def handle_play_text( # noqa: PLR0913 call: CallStateModel, client: CallAutomationClient, @@ -464,6 +358,47 @@ async def handle_transfer( raise e +async def start_audio_streaming( + client: CallAutomationClient, + call: CallStateModel, +) -> None: + logger.info("Starting audio streaming") + try: + assert call.voice_id, "Voice ID is required to control the call" + async with _use_call_client(client, call.voice_id) as call_client: + # TODO: Use the public API once the "await" have been fixed + # await call_client.start_media_streaming() + await call_client._call_media_client.start_media_streaming( + call_connection_id=call_client._call_connection_id, + start_media_streaming_request=StartMediaStreamingRequest(), + ) + except ResourceNotFoundError: + logger.debug("Call hung up before starting streaming") + except HttpResponseError as e: + if "call already terminated" in e.message.lower(): + logger.debug("Call hung up before starting streaming") + else: + raise e + + +async def stop_audio_streaming( + client: CallAutomationClient, + call: CallStateModel, +) -> None: + logger.info("Stopping audio streaming") + try: + assert call.voice_id, "Voice ID is required to control the call" + async with _use_call_client(client, call.voice_id) as call_client: + await call_client.stop_media_streaming() + except ResourceNotFoundError: + logger.debug("Call hung up before stopping streaming") + except HttpResponseError as e: + if "call already terminated" in e.message.lower(): + logger.debug("Call hung up before stopping streaming") + else: + raise e + + def _context_builder(contexts: set[ContextEnum | None] | None) -> str | None: if not contexts: return None diff --git a/app/helpers/config_models/cognitive_service.py b/app/helpers/config_models/cognitive_service.py index 8c8a6820..891e3d03 100644 --- a/app/helpers/config_models/cognitive_service.py +++ b/app/helpers/config_models/cognitive_service.py @@ -3,3 +3,5 @@ class CognitiveServiceModel(BaseModel): endpoint: str + region: str + resource_id: str diff --git a/app/helpers/config_models/prompts.py b/app/helpers/config_models/prompts.py index 488aee49..9fac74e4 100644 --- a/app/helpers/config_models/prompts.py +++ b/app/helpers/config_models/prompts.py @@ -20,7 +20,6 @@ class SoundModel(BaseModel): loading_tpl: str = "{public_url}/loading.wav" - ready_tpl: str = "{public_url}/ready.wav" def loading(self) -> str: from app.helpers.config import CONFIG @@ -29,13 +28,6 @@ def loading(self) -> str: public_url=CONFIG.resources.public_url, ) - def ready(self) -> str: - from app.helpers.config import CONFIG - - return self.ready_tpl.format( - public_url=CONFIG.resources.public_url, - ) - class LlmModel(BaseModel): """ @@ -118,7 +110,7 @@ class LlmModel(BaseModel): ## Example 1 Conversation objective: Help the customer with their accident. Customer will be calling from a car, with the SOS button. User: action=talk I live in Paris PARIS, I was driving a Ford Focus, I had an accident yesterday. - Tools: update indicent location, update vehicule reference, update incident date + Tools: update indicent location, update vehicule reference, update incident date, get trainings for the car model Assistant: style=sad I understand, your car has been in an accident. style=none Let me think... I have updated your file. Now, could I have the license plate number of your car? Also were there any injuries? ## Example 2 @@ -126,7 +118,7 @@ class LlmModel(BaseModel): Assistant: Hello, I'm Marc, the virtual assistant. I'm here to help you. Don't hesitate to ask me anything. Assistant: I'm specialized in insurance contracts. We can discuss that together. How can I help you today? User: action=talk The roof has had holes since yesterday's big storm. They're about the size of golf balls. I'm worried about water damage. - Tools: update incident description, create a reminder for assistant to plan an appointment with a roofer + Tools: update incident description, get trainings for contract details and claim history, create a reminder for assistant to plan an appointment with a roofer Assistant: style=sad I know what you mean... I see, your roof has holes since the big storm yesterday. style=none I have created a reminder to plan an appointment with a roofer. style=cheerful I hope you are safe and sound! Take care of yourself... style=none Can you confirm me the address of the house plus the date of the storm? ## Example 3 @@ -139,13 +131,13 @@ class LlmModel(BaseModel): Assistant: Hello, I'm John, the virtual assistant. I'm here to help you. Don't hesitate to ask me anything. Assistant: I'm specialized in home care services. How can I help you today? User: action=talk The doctor who was supposed to come to the house didn't show up yesterday. - Tools: create a reminder for assistant to call the doctor to reschedule the appointment, create a reminder for assistant to call the customer in two days to check if the doctor came + Tools: create a reminder for assistant to call the doctor to reschedule the appointment, create a reminder for assistant to call the customer in two days to check if the doctor came, get trainings for the scheduling policy of the doctor Assistant: style=sad Let me see, the doctor did not come to your home yesterday... I'll do my best to help you. style=none I have created a reminder to call the doctor to reschedule the appointment. Now, it should be better for you. And, I'll tale care tomorrow to see if the doctor came. style=cheerful Is it the first time the doctor didn't come? ## Example 5 Conversation objective: Assistant is a call center agent for a car insurance company. Help through the claim process. User: action=call I had an accident this morning, I was shopping. My car is at home, at 134 Rue de Rivoli. - Tools: update incident location, update incident description + Tools: update incident location, update incident description, get trainings for the claim process Assistant: style=sad I understand, you had an accident this morning while shopping. style=none I have updated your file with the location you are at Rue de Rivoli. Can you tell me more about the accident? User: action=hungup User: action=call @@ -155,7 +147,7 @@ class LlmModel(BaseModel): Conversation objective: Fill the claim with the customer. Claim is about a car accident. User: action=talk I had an accident this morning, I was shopping. Let me send the exact location by SMS. User: action=sms At the corner of Rue de la Paix and Rue de Rivoli. - Tools: update incident location + Tools: update incident location,n Assistant: style=sad I get it, you had an accident this morning while shopping. style=none I have updated your file with the location you sent me by SMS. style=cheerful Is it correct? ## Example 7 @@ -167,7 +159,7 @@ class LlmModel(BaseModel): ## Example 8 Conversation objective: Gather feedbacks after an in-person meeting between a sales representative and the customer. User: action=talk Can you talk a bit slower? - Tools: update voice speed + Tools: update voice speed, get trainings for the escalation process Assistant: style=none I will talk slower. If you need me to repeat something, just ask me. Now, can you tall me a bit more about the meeting? How did it go? ## Example 9 diff --git a/app/helpers/features.py b/app/helpers/features.py index 5a312a5c..6945e59e 100644 --- a/app/helpers/features.py +++ b/app/helpers/features.py @@ -1,17 +1,19 @@ from typing import TypeVar, cast from azure.appconfiguration.aio import AzureAppConfigurationClient +from azure.core.exceptions import ResourceNotFoundError from app.helpers.config import CONFIG from app.helpers.config_models.cache import MemoryModel from app.helpers.http import azure_transport from app.helpers.identity import credential +from app.helpers.logging import logger from app.persistence.icache import ICache from app.persistence.memory import MemoryCache _cache: ICache = MemoryCache(MemoryModel(max_size=100)) _client: AzureAppConfigurationClient | None = None -T = TypeVar("T", bool, int, str) +T = TypeVar("T", bool, int, float, str) async def answer_hard_timeout_sec() -> int: @@ -26,8 +28,12 @@ async def callback_timeout_hour() -> int: return await _get(key="callback_timeout_hour", type_res=int) or 24 -async def phone_silence_timeout_sec() -> int: - return await _get(key="phone_silence_timeout_sec", type_res=int) or 10 +async def vad_silence_timeout_ms() -> int: + return await _get(key="vad_silence_timeout_ms", type_res=int) or 500 + + +async def vad_threshold() -> float: + return await _get(key="vad_threshold", type_res=float) or 0.5 async def recording_enabled() -> bool: @@ -38,8 +44,8 @@ async def slow_llm_for_chat() -> bool: return await _get(key="slow_llm_for_chat", type_res=bool) or True -async def voice_recognition_retry_max() -> int: - return await _get(key="voice_recognition_retry_max", type_res=int) or 3 +async def recognition_retry_max() -> int: + return await _get(key="recognition_retry_max", type_res=int) or 3 async def _get(key: str, type_res: type[T]) -> T | None: @@ -49,10 +55,14 @@ async def _get(key: str, type_res: type[T]) -> T | None: if cached: return _parse(value=cached.decode(), type_res=type_res) # Try live - async with await _use_client() as client: - setting = await client.get_configuration_setting(key) - # Return default if not found - if not setting: + try: + async with await _use_client() as client: + setting = await client.get_configuration_setting(key) + # Return default if not found + if not setting: + return + except ResourceNotFoundError: + logger.warning("Setting %s not found", key) return # Update cache await _cache.aset( @@ -85,11 +95,16 @@ def _cache_key(key: str) -> str: return f"{__name__}-{key}" -def _parse(value: str, type_res: type[T]) -> T: - if type_res is bool: - return cast(T, value.lower() == "true") - if type_res is int: - return cast(T, int(value)) - if type_res is str: - return cast(T, str(value)) - raise ValueError(f"Unsupported type: {type_res}") +def _parse(value: str, type_res: type[T]) -> T | None: + try: + if type_res is bool: + return cast(T, value.lower() == "true") + if type_res is int: + return cast(T, int(value)) + if type_res is float: + return cast(T, float(value)) + if type_res is str: + return cast(T, str(value)) + raise ValueError(f"Unsupported type: {type_res}") + except ValueError: + pass diff --git a/app/helpers/llm_tools.py b/app/helpers/llm_tools.py index 7449c64b..139363ea 100644 --- a/app/helpers/llm_tools.py +++ b/app/helpers/llm_tools.py @@ -1,16 +1,14 @@ import asyncio from collections.abc import Awaitable, Callable from html import escape -from inspect import getmembers, isfunction from typing import Annotated, Literal, TypedDict from azure.communication.callautomation.aio import CallAutomationClient -from openai.types.chat import ChatCompletionToolParam from pydantic import ValidationError from app.helpers.call_utils import ContextEnum as CallContextEnum, handle_play_text from app.helpers.config import CONFIG -from app.helpers.llm_utils import function_schema +from app.helpers.llm_utils import AbstractPlugin from app.helpers.logging import logger from app.models.call import CallStateModel from app.models.message import ( @@ -31,8 +29,7 @@ class UpdateClaimDict(TypedDict): value: str -class LlmPlugins: - call: CallStateModel +class DefaultPlugin(AbstractPlugin): client: CallAutomationClient post_callback: Callable[[CallStateModel], Awaitable[None]] style: MessageStyleEnum = MessageStyleEnum.NONE @@ -45,7 +42,7 @@ def __init__( post_callback: Callable[[CallStateModel], Awaitable[None]], tts_callback: Callable[[str, MessageStyleEnum], Awaitable[None]], ): - self.call = call + super().__init__(call) self.client = client self.post_callback = post_callback self.tts_callback = tts_callback @@ -581,13 +578,3 @@ async def speech_lang( await self.tts_callback(customer_response, self.style) # LLM confirmation return f"Voice language set to {lang} (was {initial_lang})" - - @staticmethod - async def to_openai(call: CallStateModel) -> list[ChatCompletionToolParam]: - return await asyncio.gather( - *[ - function_schema(arg_type, call=call) - for name, arg_type in getmembers(LlmPlugins, isfunction) - if not name.startswith("_") and name != "to_openai" - ] - ) diff --git a/app/helpers/llm_utils.py b/app/helpers/llm_utils.py index be3471bc..d26e3f74 100644 --- a/app/helpers/llm_utils.py +++ b/app/helpers/llm_utils.py @@ -3,8 +3,10 @@ See: https://github.com/microsoft/autogen/blob/2750391f847b7168d842dfcb815ac37bd94c9a0e/autogen/function_utils.py """ +import asyncio import inspect from collections.abc import Callable +from inspect import getmembers, isfunction from textwrap import dedent from typing import Annotated, Any, ForwardRef, TypeVar @@ -16,6 +18,7 @@ from pydantic.json_schema import JsonSchemaValue from app.helpers.logging import logger +from app.models.call import CallStateModel T = TypeVar("T") _jinja = Environment( @@ -34,7 +37,26 @@ class Parameters(BaseModel): type: str = "object" -async def function_schema( +class AbstractPlugin: + call: CallStateModel + + def __init__( + self, + call: CallStateModel, + ): + self.call = call + + async def to_openai(self) -> list[ChatCompletionToolParam]: + return await asyncio.gather( + *[ + _function_schema(arg_type, call=self.call) + for name, arg_type in getmembers(self.__class__, isfunction) + if not name.startswith("_") and name != "to_openai" + ] + ) + + +async def _function_schema( f: Callable[..., Any], **kwargs: Any ) -> ChatCompletionToolParam: """ @@ -82,7 +104,7 @@ async def function_schema( ) ).model_dump() - function = ChatCompletionToolParam( + return ChatCompletionToolParam( type="function", function=FunctionDefinition( description=description, @@ -91,8 +113,6 @@ async def function_schema( ), ) - return function - def _typed_annotation(annotation: Any, global_namespace: dict[str, Any]) -> Any: """ diff --git a/app/main.py b/app/main.py index ac200ec6..652935c3 100644 --- a/app/main.py +++ b/app/main.py @@ -1,16 +1,23 @@ import asyncio import json +from base64 import b64decode from contextlib import asynccontextmanager from datetime import timedelta from http import HTTPStatus from os import getenv -from typing import Annotated +from typing import Annotated, Any from urllib.parse import quote_plus, urljoin from uuid import UUID import jwt import mistune -from azure.communication.callautomation import PhoneNumberIdentifier +from azure.communication.callautomation import ( + MediaStreamingAudioChannelType, + MediaStreamingContentType, + MediaStreamingOptions, + MediaStreamingTransportType, + PhoneNumberIdentifier, +) from azure.communication.callautomation.aio import CallAutomationClient from azure.core.credentials import AzureKeyCredential from azure.core.messaging import CloudEvent @@ -21,16 +28,19 @@ HTTPException, Request, Response, + WebSocket, ) from fastapi.exceptions import RequestValidationError, ValidationException from fastapi.responses import HTMLResponse, JSONResponse from htmlmin.minify import html_minify from jinja2 import Environment, FileSystemLoader from pydantic import Field, TypeAdapter, ValidationError +from starlette.datastructures import Headers from starlette.exceptions import HTTPException as StarletteHTTPException from twilio.twiml.messaging_response import MessagingResponse from app.helpers.call_events import ( + on_audio_connected, on_call_connected, on_call_disconnected, on_end_call, @@ -38,10 +48,8 @@ on_new_call, on_play_completed, on_play_error, - on_recognize_timeout_error, - on_recognize_unknown_error, + on_recognize_error, on_sms_received, - on_speech_recognized, on_transfer_completed, on_transfer_error, ) @@ -99,11 +107,16 @@ # Communication Services callback assert CONFIG.public_domain, "public_domain config is not set" +_COMMUNICATIONSERVICES_WSS_TPL = urljoin( + str(CONFIG.public_domain).replace("https://", "wss://"), + "/communicationservices/wss/{call_id}/{callback_secret}", +) +logger.info("Using WebSocket URL %s", _COMMUNICATIONSERVICES_WSS_TPL) _COMMUNICATIONSERVICES_CALLABACK_TPL = urljoin( str(CONFIG.public_domain), - "/communicationservices/event/{call_id}/{callback_secret}", + "/communicationservices/callback/{call_id}/{callback_secret}", ) -logger.info("Using call event URL %s", _COMMUNICATIONSERVICES_CALLABACK_TPL) +logger.info("Using callback URL %s", _COMMUNICATIONSERVICES_CALLABACK_TPL) @asynccontextmanager @@ -367,13 +380,23 @@ async def call_post(request: Request) -> CallGetModel: except ValidationError as e: raise RequestValidationError([str(e)]) from e - url, call = await _communicationservices_event_url(initiate.phone_number, initiate) + callback_url, wss_url, call = await _communicationservices_urls( + initiate.phone_number, initiate + ) span_attribute(CallAttributes.CALL_ID, str(call.call_id)) span_attribute(CallAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) automation_client = await _use_automation_client() + streaming_options = MediaStreamingOptions( + audio_channel_type=MediaStreamingAudioChannelType.UNMIXED, + content_type=MediaStreamingContentType.AUDIO, + start_media_streaming=False, + transport_type=MediaStreamingTransportType.WEBSOCKET, + transport_url=wss_url, + ) call_connection_properties = await automation_client.create_call( - callback_url=url, + callback_url=callback_url, cognitive_services_endpoint=CONFIG.cognitive_service.endpoint, + media_streaming=streaming_options, source_caller_id_number=_source_caller, target_participant=PhoneNumberIdentifier(initiate.phone_number), # pyright: ignore ) @@ -398,21 +421,22 @@ async def call_event( event = EventGridEvent.from_json(call.content) event_type = event.event_type - logger.debug("Call event with data %s", event.data) if not event_type == SystemEventNames.AcsIncomingCallEventName: logger.warning("Event %s not supported", event_type) + logger.debug("Event data %s", event.data) return call_context: str = event.data["incomingCallContext"] phone_number = PhoneNumber(event.data["from"]["phoneNumber"]["value"]) - url, _call = await _communicationservices_event_url(phone_number) + callback_url, wss_url, _call = await _communicationservices_urls(phone_number) span_attribute(CallAttributes.CALL_ID, str(_call.call_id)) span_attribute(CallAttributes.CALL_PHONE_NUMBER, _call.initiate.phone_number) await on_new_call( - callback_url=url, + callback_url=callback_url, client=await _use_automation_client(), incoming_context=call_context, phone_number=phone_number, + wss_url=wss_url, ) @@ -444,37 +468,17 @@ async def sms_event( return span_attribute(CallAttributes.CALL_ID, str(call.call_id)) - async def _post_callback(_call: CallStateModel) -> None: - await _trigger_post_event(_call) - - async def _training_callback(_call: CallStateModel) -> None: - await _trigger_training_event(_call) - await on_sms_received( call=call, - client=await _use_automation_client(), message=message, - post_callback=_post_callback, - training_callback=_training_callback, ) -@api.post("/communicationservices/event/{call_id}/{secret}") -@tracer.start_as_current_span("communicationservices_event_post") -async def communicationservices_event_post( - call_id: UUID, - secret: Annotated[str, Field(min_length=16, max_length=16)], - request: Request, -) -> None | ErrorModel: - """ - Handle direct events from Azure Communication Services for a running call. - - No parameters are expected. The body is a list of JSON objects `CloudEvent`. - - Returns a 204 No Content if the events are properly formatted. A 401 Unauthorized if the JWT token is invalid. Otherwise, returns a 400 Bad Request. - """ +async def _communicationservices_validate_jwt( + headers: Headers, +) -> None: # Validate JWT token - service_jwt: str | None = request.headers.get("Authorization") + service_jwt: str | None = headers.get("Authorization") if not service_jwt: raise HTTPException( detail="Authorization header missing", @@ -501,6 +505,108 @@ async def communicationservices_event_post( status_code=HTTPStatus.UNAUTHORIZED, ) from e + +async def _communicationservices_validate_call_id( + call_id: UUID, + secret: str, +) -> CallStateModel: + span_attribute(CallAttributes.CALL_ID, str(call_id)) + + call = await _db.call_aget(call_id) + if not call: + raise HTTPException( + detail=f"Call {call_id} not found", + status_code=HTTPStatus.NOT_FOUND, + ) + if call.callback_secret != secret: + raise HTTPException( + detail="Secret does not match", + status_code=HTTPStatus.UNAUTHORIZED, + ) + + span_attribute(CallAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) + return call + + +@api.websocket("/communicationservices/wss/{call_id}/{secret}") +@tracer.start_as_current_span("communicationservices_event_post") +async def communicationservices_wss_post( + call_id: UUID, + secret: Annotated[str, Field(min_length=16, max_length=16)], + websocket: WebSocket, +) -> None: + # Validate connection + # TODO: Uncomment when JWT validation is fixed + # await _communicationservices_validate_jwt(websocket.headers) + call = await _communicationservices_validate_call_id(call_id, secret) + + # Accept connection + await websocket.accept() + logger.info("WebSocket connection established for call %s", call.call_id) + + # Client SDK + automation_client = await _use_automation_client() + + # Queue audio data + audio_queue: asyncio.Queue[bytes] = asyncio.Queue() + + async def _consume_audio() -> None: + logger.debug("Audio data consumer started") + async for event in websocket.iter_json(): + # DEBUG: Uncomment to see all events, but be careful, it will be verbose ^_^ + # logger.debug("Audio event received %s", event) + + # TODO: Handle configuration event (audio format, sample rate, etc.) + # Skip non-audio events + if "kind" not in event or event["kind"] != "AudioData": + continue + + # Filter out silent audio + audio_data: dict[str, Any] = event.get("audioData", {}) + audio_base64: str | None = audio_data.get("data", None) + audio_silent: bool | None = audio_data.get("silent", True) + if audio_silent or not audio_base64: + continue + + # Queue audio + await audio_queue.put(b64decode(audio_base64)) + + await asyncio.gather( + # Consume audio from the WebSocket + _consume_audio(), + # Process audio + # TODO: Dynamically set the audio format + on_audio_connected( + audio_bits_per_sample=16, + audio_channels=1, + audio_sample_rate=16000, + audio_stream=audio_queue, + call=call, + client=automation_client, + post_callback=_trigger_post_event, + training_callback=_trigger_training_event, + ), + ) + + +@api.post("/communicationservices/callback/{call_id}/{secret}") +@tracer.start_as_current_span("communicationservices_callback_post") +async def communicationservices_callback_post( + call_id: UUID, + request: Request, + secret: Annotated[str, Field(min_length=16, max_length=16)], +) -> None: + """ + Handle direct events from Azure Communication Services for a running call. + + No parameters are expected. The body is a list of JSON objects `CloudEvent`. + + Returns a 204 No Content if the events are properly formatted. A 401 Unauthorized if the JWT token is invalid. Otherwise, returns a 400 Bad Request. + """ + + # Validate connection + await _communicationservices_validate_jwt(request.headers) + # Validate request events = await request.json() if not events or not isinstance(events, list): @@ -520,7 +626,7 @@ async def communicationservices_event_post( # TODO: Refacto, too long (and remove PLR0912/PLR0915 ignore) -async def _communicationservices_event_worker( # noqa: PLR0912, PLR0915 +async def _communicationservices_event_worker( call_id: UUID, event_dict: dict, secret: str, @@ -540,16 +646,10 @@ async def _communicationservices_event_worker( # noqa: PLR0912, PLR0915 Returns None. Can trigger additional events to `training` and `post` queues. """ - span_attribute(CallAttributes.CALL_ID, str(call_id)) - call = await _db.call_aget(call_id) - if not call: - logger.warning("Call %s not found", call_id) - return - if call.callback_secret != secret: - logger.warning("Secret for call %s does not match", call_id) - return - span_attribute(CallAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) + # Validate connection + call = await _communicationservices_validate_call_id(call_id, secret) + # Event parsing event = CloudEvent.from_dict(event_dict) assert isinstance(event.data, dict) @@ -564,116 +664,78 @@ async def _communicationservices_event_worker( # noqa: PLR0912, PLR0915 # Client SDK automation_client = await _use_automation_client() - logger.debug("Call event received %s for call %s", event_type, call) - logger.debug(event.data) - - async def _post_callback(_call: CallStateModel) -> None: - await _trigger_post_event(_call) - - async def _training_callback(_call: CallStateModel) -> None: - await _trigger_training_event(_call) - - if event_type == "Microsoft.Communication.CallConnected": # Call answered - server_call_id = event.data["serverCallId"] - await on_call_connected( - call=call, - client=automation_client, - post_callback=_post_callback, - server_call_id=server_call_id, - training_callback=_training_callback, - ) + # Log + logger.debug("Call event received %s for call %s", event_type, call.call_id) - elif event_type == "Microsoft.Communication.CallDisconnected": # Call hung up - await on_call_disconnected( - call=call, - client=automation_client, - post_callback=_post_callback, - ) + match event_type: + case "Microsoft.Communication.CallConnected": # Call answered + server_call_id = event.data["serverCallId"] + await on_call_connected( + call=call, + client=automation_client, + server_call_id=server_call_id, + ) - elif ( - event_type == "Microsoft.Communication.RecognizeCompleted" - ): # Speech recognized - recognition_result: str = event.data["recognitionType"] + case "Microsoft.Communication.CallDisconnected": # Call hung up + await on_call_disconnected( + call=call, + client=automation_client, + post_callback=_trigger_post_event, + ) - if recognition_result == "speech": # Handle voice - speech_text: str | None = event.data["speechResult"]["speech"] - if speech_text: - await on_speech_recognized( + case "Microsoft.Communication.RecognizeCompleted": # Speech/IVR recognized + recognition_result: str = event.data["recognitionType"] + if recognition_result == "choices": # Handle IVR + label_detected: str = event.data["choiceResult"]["label"] + await on_ivr_recognized( call=call, client=automation_client, - post_callback=_post_callback, - text=speech_text, - training_callback=_training_callback, + label=label_detected, ) - elif recognition_result == "choices": # Handle IVR - label_detected: str = event.data["choiceResult"]["label"] - await on_ivr_recognized( + case "Microsoft.Communication.RecognizeFailed": # Speech/IVR failed + result_information = event.data["resultInformation"] + error_code: int = result_information["subCode"] + error_message: str = result_information["message"] + logger.debug( + "Speech recognition failed with error code %s: %s", + error_code, + error_message, + ) + await on_recognize_error( call=call, client=automation_client, - label=label_detected, - post_callback=_post_callback, - training_callback=_training_callback, + contexts=operation_contexts, ) - elif ( - event_type == "Microsoft.Communication.RecognizeFailed" - ): # Speech recognition failed - result_information = event.data["resultInformation"] - error_code: int = result_information["subCode"] - error_message: str = result_information["message"] - logger.debug( - "Speech recognition failed with error code %s: %s", - error_code, - error_message, - ) - # Error codes: - # 8510 = Action failed, initial silence timeout reached - # 8532 = Action failed, inter-digit silence timeout reached - # See: https://github.com/MicrosoftDocs/azure-docs/blob/main/articles/communication-services/how-tos/call-automation/recognize-action.md#event-codes - if error_code in (8510, 8532): # Timeout retry - await on_recognize_timeout_error( + case "Microsoft.Communication.PlayCompleted": # Media played + await on_play_completed( call=call, client=automation_client, contexts=operation_contexts, - post_callback=_post_callback, - training_callback=_training_callback, + post_callback=_trigger_post_event, ) - else: # Unknown error - await on_recognize_unknown_error( + + case "Microsoft.Communication.PlayFailed": # Media play failed + result_information = event.data["resultInformation"] + error_code: int = result_information["subCode"] + await on_play_error(error_code) + + case "Microsoft.Communication.CallTransferAccepted": # Call transfer accepted + await on_transfer_completed() + + case "Microsoft.Communication.CallTransferFailed": # Call transfer failed + result_information = event.data["resultInformation"] + sub_code: int = result_information["subCode"] + await on_transfer_error( call=call, client=automation_client, - error_code=error_code, + error_code=sub_code, ) - elif event_type == "Microsoft.Communication.PlayCompleted": # Media played - await on_play_completed( - call=call, - client=automation_client, - contexts=operation_contexts, - post_callback=_post_callback, - ) - - elif event_type == "Microsoft.Communication.PlayFailed": # Media play failed - result_information = event.data["resultInformation"] - error_code: int = result_information["subCode"] - await on_play_error(error_code) - - elif ( - event_type == "Microsoft.Communication.CallTransferAccepted" - ): # Call transfer accepted - await on_transfer_completed() - - elif ( - event_type == "Microsoft.Communication.CallTransferFailed" - ): # Call transfer failed - result_information = event.data["resultInformation"] - sub_code: int = result_information["subCode"] - await on_transfer_error( - call=call, - client=automation_client, - error_code=sub_code, - ) + case _: + logger.warning("Event %s not supported", event_type) + logger.debug("Event data %s", event.data) await _db.call_aset( call @@ -711,7 +773,7 @@ async def post_event( if not call: logger.warning("Call %s not found", post.content) return - logger.debug("Post event received for call %s", call) + logger.debug("Post event received for call %s", call.call_id) span_attribute(CallAttributes.CALL_ID, str(call.call_id)) span_attribute(CallAttributes.CALL_PHONE_NUMBER, call.initiate.phone_number) await on_end_call(call) @@ -731,13 +793,15 @@ async def _trigger_post_event(call: CallStateModel) -> None: await _post_queue.send_message(str(call.call_id)) -async def _communicationservices_event_url( +async def _communicationservices_urls( phone_number: PhoneNumber, initiate: CallInitiateModel | None = None -) -> tuple[str, CallStateModel]: +) -> tuple[str, str, CallStateModel]: """ Generate the callback URL for a call. If the caller has already called, use the same call ID, to keep the conversation history. Otherwise, create a new call ID. + + Returnes a tuple of the callback URL, the WebSocket URL, and the call object. """ call = await _db.call_asearch_one(phone_number) if not call or ( @@ -751,11 +815,15 @@ async def _communicationservices_event_url( ) ) await _db.call_aset(call) # Create for the first time - url = _COMMUNICATIONSERVICES_CALLABACK_TPL.format( + wss_url = _COMMUNICATIONSERVICES_WSS_TPL.format( callback_secret=call.callback_secret, call_id=str(call.call_id), ) - return url, call + callaback_url = _COMMUNICATIONSERVICES_CALLABACK_TPL.format( + callback_secret=call.callback_secret, + call_id=str(call.call_id), + ) + return callaback_url, wss_url, call # TODO: Secure this endpoint with a secret, either in the Authorization header or in the URL @@ -782,18 +850,9 @@ async def twilio_sms_post( else: span_attribute(CallAttributes.CALL_ID, str(call.call_id)) - async def _post_callback(_call: CallStateModel) -> None: - await _trigger_post_event(_call) - - async def _training_callback(_call: CallStateModel) -> None: - await _trigger_training_event(_call) - event_status = await on_sms_received( call=call, - client=await _use_automation_client(), message=Body, - post_callback=_post_callback, - training_callback=_training_callback, ) if not event_status: raise HTTPException( diff --git a/app/models/call.py b/app/models/call.py index 5447db95..cd93ce28 100644 --- a/app/models/call.py +++ b/app/models/call.py @@ -13,7 +13,12 @@ ) from app.helpers.monitoring import tracer from app.helpers.pydantic_types.phone_numbers import PhoneNumber -from app.models.message import ActionEnum as MessageActionEnum, MessageModel +from app.models.message import ( + ActionEnum as MessageActionEnum, + MessageModel, + PersonaEnum as MessagePersonaEnum, + StyleEnum as MessageStyleEnum, +) from app.models.next import NextModel from app.models.reminder import ReminderModel from app.models.synthesis import SynthesisModel @@ -145,4 +150,19 @@ async def trainings(self, cache_only: bool = True) -> list[TrainingModel]: return trainings def tz(self) -> tzinfo: + """ + Get the timezone of the phone number. + """ return PhoneNumber.tz(self.initiate.phone_number) + + def last_assistant_style(self) -> MessageStyleEnum: + """ + Get the last assistant message style. + """ + inverted_messages = self.messages.copy() + inverted_messages.reverse() + for message in inverted_messages: + if message.persona != MessagePersonaEnum.ASSISTANT: + continue + return message.style + return MessageStyleEnum.NONE diff --git a/app/models/message.py b/app/models/message.py index e5605945..bb4fdcd5 100644 --- a/app/models/message.py +++ b/app/models/message.py @@ -81,7 +81,7 @@ def __add__(self, other: ChoiceDeltaToolCall) -> "ToolModel": self.function_arguments += other.function.arguments return self - async def execute_function(self, plugins: object) -> None: + async def execute_function(self, plugin: object) -> None: from app.helpers.logging import logger json_str = self.function_arguments @@ -96,7 +96,7 @@ async def execute_function(self, plugins: object) -> None: # Try to fix JSON args to catch LLM hallucinations # See: https://community.openai.com/t/gpt-4-1106-preview-messes-up-function-call-parameters-encoding/478500 - args: dict[str, Any] = repair_json( + args: dict[str, Any] | Any = repair_json( json_str=json_str, return_objects=True, ) # pyright: ignore @@ -119,7 +119,7 @@ async def execute_function(self, plugins: object) -> None: }, ) as span: try: - res = await getattr(plugins, name)(**args) + res = await getattr(plugin, name)(**args) res_log = f"{res[:20]}...{res[-20:]}" logger.info("Executing function %s (%s): %s", name, args, res_log) except TypeError as e: @@ -146,10 +146,10 @@ async def execute_function(self, plugins: object) -> None: @staticmethod def _available_function_names() -> list[str]: from app.helpers.llm_tools import ( - LlmPlugins, + DefaultPlugin, ) - return [name for name, _ in getmembers(LlmPlugins, isfunction)] + return [name for name, _ in getmembers(DefaultPlugin, isfunction)] class MessageModel(BaseModel): @@ -216,6 +216,7 @@ def to_openai( tool_call_id=tool_call.tool_id, ) for tool_call in self.tool_calls + if tool_call.content ) return res diff --git a/cicd/bicep/app.bicep b/cicd/bicep/app.bicep index 71a810b8..556ecdfe 100644 --- a/cicd/bicep/app.bicep +++ b/cicd/bicep/app.bicep @@ -68,6 +68,8 @@ var config = { sms: localConfig.sms cognitive_service: { endpoint: cognitiveCommunication.properties.endpoint + region: cognitiveCommunication.location + resource_id: cognitiveCommunication.id } llm: { fast: { @@ -471,6 +473,21 @@ resource assignmentsCommunicationServicesCognitiveUser 'Microsoft.Authorization/ } } +// Cognitive Services Speech User +resource roleCognitiveSpeechUser 'Microsoft.Authorization/roleDefinitions@2022-04-01' existing = { + name: 'f2dc8367-1007-4938-bd23-fe263f013447' +} + +resource assignmentsAppCognitiveSpeechUser 'Microsoft.Authorization/roleAssignments@2022-04-01' = { + name: guid(subscription().id, prefix, cognitiveCommunication.name, 'assignmentsAppCognitiveSpeechUser') + scope: cognitiveCommunication + properties: { + principalId: containerApp.identity.principalId + principalType: 'ServicePrincipal' + roleDefinitionId: roleCognitiveSpeechUser.id + } +} + resource cognitiveCommunication 'Microsoft.CognitiveServices/accounts@2024-06-01-preview' = { name: '${prefix}-${cognitiveCommunicationLocation}-communication' location: cognitiveCommunicationLocation @@ -885,10 +902,11 @@ resource configValues 'Microsoft.AppConfiguration/configurationStores/keyValues@ answer_hard_timeout_sec: 180 answer_soft_timeout_sec: 30 callback_timeout_hour: 3 - phone_silence_timeout_sec: 1 + recognition_retry_max: 2 recording_enabled: false - slow_llm_for_chat: true - voice_recognition_retry_max: 2 + slow_llm_for_chat: false + vad_silence_timeout_ms: 500 + vad_threshold: '0.5' }): { parent: configStore name: item.key diff --git a/public/ready.wav b/public/ready.wav deleted file mode 100644 index 542e5e6c..00000000 Binary files a/public/ready.wav and /dev/null differ diff --git a/pyproject.toml b/pyproject.toml index ba76203a..3def0df5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -12,7 +12,8 @@ dependencies = [ "aiosqlite~=0.20", # Async SQLite3 driver "azure-ai-translation-text~=1.0", # Azure Cognitive Services Text Translation "azure-appconfiguration~=1.7", # Outsourced configuration for live updates - "azure-communication-callautomation~=1.2", # Azure Communication Services Call Automation + "azure-cognitiveservices-speech~=1.41", # Azure AI Speech + "azure-communication-callautomation~=1.3.0a0", # Azure Communication Services Call Automation "azure-communication-sms~=1.1", # Azure Communication Services SMS "azure-cosmos~=4.7", # Azure Cosmos DB "azure-eventgrid~=4.20", # Azure Event Grid @@ -37,6 +38,7 @@ dependencies = [ "pydantic-extra-types~=2.9", # Extra types for Pydantic "pydantic-settings~=2.6", # Application configuration management with Pydantic "pydantic[email]~=2.9", # Data serialization and validation, plus email validation + "pydub~=0.25", # Audio processing with FFmpeg "pyjwt~=2.9", # Secure inbound calls from Communication Services "python-dotenv~=1.0", # Load environment variables from .env file "python-multipart==0.*", # Form parsing diff --git a/requirements-dev.txt b/requirements-dev.txt index 8573fc1a..801ca84d 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -147,13 +147,21 @@ azure-appconfiguration==1.7.1 \ --hash=sha256:3ebe41e9be3f4ae6ca61e5dbc42c4b7cc007a01054a8506501a26dfc199fd3ec \ --hash=sha256:6e62b040a0210071be4423aafbdca3b053884c0d412855e3f8eff8e8d0b1a02b # via call-center-ai (pyproject.toml) +azure-cognitiveservices-speech==1.41.1 \ + --hash=sha256:039eec52c0a549a30658fa24a06d42afc6366c47b03b961c0b6f730fd421293e \ + --hash=sha256:0f52f7852965bb2f5cf9aed0d3c6ef58238867bb6f0287eba95e42e1a513dd74 \ + --hash=sha256:13679949f52f89c263e8b1c6a2d0f384d663917c58b150772cf42b710a01321c \ + --hash=sha256:70030c6f1c875895eb985de3775f62349aa8687b6616afa9498466e281f178d3 \ + --hash=sha256:94ddda0deb3a9fee58a0a781b09ab8ab95401c5daf9bfc9f84ce8134d3a77055 \ + --hash=sha256:ea9d466f236598e37ea3dad1db203a2901ef91b407e435b59d9b22669324074d + # via call-center-ai (pyproject.toml) azure-common==1.1.28 \ --hash=sha256:4ac0cd3214e36b6a1b6a442686722a5d8cc449603aa833f3f0f40bda836704a3 \ --hash=sha256:5c12d3dcf4ec20599ca6b0d3e09e86e146353d443e7fcc050c9a19c1f9df20ad # via azure-search-documents -azure-communication-callautomation==1.2.0 \ - --hash=sha256:45efce4c0990e9421b3de755d592c750dabd8b19e90441e03f84e12127896461 \ - --hash=sha256:aa02155878f2540c267729657f1b12e98683bc7e7831e6d576277606ece9e3a7 +azure-communication-callautomation==1.3.0b2 \ + --hash=sha256:1b7b2164e6d3265f9ac2e7ae4c519a34724f10306e49560addcf0ec34e3ad89a \ + --hash=sha256:a1b6e4e696b1cfca7957b3de356da6cfea3b73fc54eeb27a6a2b97c872147ff7 # via call-center-ai (pyproject.toml) azure-communication-sms==1.1.0 \ --hash=sha256:3ce901924661a7e9f684c777784cdd09d0c2277489a3b563b025868f74d7a676 \ @@ -850,7 +858,6 @@ isodate==0.7.2 \ # via # azure-ai-translation-text # azure-appconfiguration - # azure-communication-callautomation # azure-eventgrid # azure-search-documents # azure-storage-queue @@ -1077,6 +1084,7 @@ msrest==0.7.1 \ --hash=sha256:21120a810e1233e5e6cc7fe40b474eeb4ec6f757a15d7cf86702c369f9567c32 \ --hash=sha256:6e7661f46f3afd88b75667b7187a92829924446c7ea1d169be8c4bb7eeb788b9 # via + # azure-communication-callautomation # azure-communication-sms # azure-monitor-opentelemetry-exporter multidict==6.1.0 \ @@ -1248,9 +1256,9 @@ oauthlib==3.2.2 \ --hash=sha256:8139f29aac13e25d502680e9e19963e83f16838d48a0d71c287fe40e7067fbca \ --hash=sha256:9859c40929662bec5d64f34d01c99e093149682a3f38915dc0655d5a633dd918 # via requests-oauthlib -openai==1.55.0 \ - --hash=sha256:446e08918f8dd70d8723274be860404c8c7cc46b91b93bbc0ef051f57eb503c1 \ - --hash=sha256:6c0975ac8540fe639d12b4ff5a8e0bf1424c844c4a4251148f59f06c4b2bd5db +openai==1.54.5 \ + --hash=sha256:2aab4f9755a3e1e04d8a45ac1f4ce7b6948bab76646020c6386256d7e5cbb7e0 \ + --hash=sha256:f55a4450f38501814b53e76311ed7845a6f7f35bab46d0fb2a3728035d7a72d8 # via # call-center-ai (pyproject.toml) # langchain-openai @@ -1910,6 +1918,10 @@ pydantic-settings==2.6.1 \ # via # call-center-ai (pyproject.toml) # langchain-community +pydub==0.25.1 \ + --hash=sha256:65617e33033874b59d87db603aa1ed450633288aefead953b30bded59cb599a6 \ + --hash=sha256:980a33ce9949cab2a569606b65674d748ecbca4f0796887fd6f46173a7b0d30f + # via call-center-ai (pyproject.toml) pygments==2.18.0 \ --hash=sha256:786ff802f32e91311bff3889f6e9a86e81505fe99f2735bb6d60ae0c5004f199 \ --hash=sha256:b8e6aca0523f3ab76fee51799c488e38782ac06eafcf95e7ba832985c8e7b13a @@ -2909,7 +2921,7 @@ pip==24.3.1 \ --hash=sha256:3790624780082365f47549d032f3770eeb2b1e8bd1f7b2e02dace1afa361b4ed \ --hash=sha256:ebcb60557f2aefabc2e0f918751cd24ea0d56d8ec5445fe1807f1d2109660b99 # via pip-tools -setuptools==75.6.0 \ - --hash=sha256:8199222558df7c86216af4f84c30e9b34a61d8ba19366cc914424cdbd28252f6 \ - --hash=sha256:ce74b49e8f7110f9bf04883b730f4765b774ef3ef28f722cce7c273d253aaf7d +setuptools==75.5.0 \ + --hash=sha256:5c4ccb41111392671f02bb5f8436dfc5a9a7185e80500531b133f5775c4163ef \ + --hash=sha256:87cb777c3b96d638ca02031192d40390e0ad97737e27b6b4fa831bea86f2f829 # via pip-tools diff --git a/requirements.txt b/requirements.txt index 1b803f42..0f5185aa 100644 --- a/requirements.txt +++ b/requirements.txt @@ -139,13 +139,21 @@ azure-appconfiguration==1.7.1 \ --hash=sha256:3ebe41e9be3f4ae6ca61e5dbc42c4b7cc007a01054a8506501a26dfc199fd3ec \ --hash=sha256:6e62b040a0210071be4423aafbdca3b053884c0d412855e3f8eff8e8d0b1a02b # via call-center-ai (pyproject.toml) +azure-cognitiveservices-speech==1.41.1 \ + --hash=sha256:039eec52c0a549a30658fa24a06d42afc6366c47b03b961c0b6f730fd421293e \ + --hash=sha256:0f52f7852965bb2f5cf9aed0d3c6ef58238867bb6f0287eba95e42e1a513dd74 \ + --hash=sha256:13679949f52f89c263e8b1c6a2d0f384d663917c58b150772cf42b710a01321c \ + --hash=sha256:70030c6f1c875895eb985de3775f62349aa8687b6616afa9498466e281f178d3 \ + --hash=sha256:94ddda0deb3a9fee58a0a781b09ab8ab95401c5daf9bfc9f84ce8134d3a77055 \ + --hash=sha256:ea9d466f236598e37ea3dad1db203a2901ef91b407e435b59d9b22669324074d + # via call-center-ai (pyproject.toml) azure-common==1.1.28 \ --hash=sha256:4ac0cd3214e36b6a1b6a442686722a5d8cc449603aa833f3f0f40bda836704a3 \ --hash=sha256:5c12d3dcf4ec20599ca6b0d3e09e86e146353d443e7fcc050c9a19c1f9df20ad # via azure-search-documents -azure-communication-callautomation==1.2.0 \ - --hash=sha256:45efce4c0990e9421b3de755d592c750dabd8b19e90441e03f84e12127896461 \ - --hash=sha256:aa02155878f2540c267729657f1b12e98683bc7e7831e6d576277606ece9e3a7 +azure-communication-callautomation==1.3.0b2 \ + --hash=sha256:1b7b2164e6d3265f9ac2e7ae4c519a34724f10306e49560addcf0ec34e3ad89a \ + --hash=sha256:a1b6e4e696b1cfca7957b3de356da6cfea3b73fc54eeb27a6a2b97c872147ff7 # via call-center-ai (pyproject.toml) azure-communication-sms==1.1.0 \ --hash=sha256:3ce901924661a7e9f684c777784cdd09d0c2277489a3b563b025868f74d7a676 \ @@ -766,7 +774,6 @@ isodate==0.7.2 \ # via # azure-ai-translation-text # azure-appconfiguration - # azure-communication-callautomation # azure-eventgrid # azure-search-documents # azure-storage-queue @@ -935,6 +942,7 @@ msrest==0.7.1 \ --hash=sha256:21120a810e1233e5e6cc7fe40b474eeb4ec6f757a15d7cf86702c369f9567c32 \ --hash=sha256:6e7661f46f3afd88b75667b7187a92829924446c7ea1d169be8c4bb7eeb788b9 # via + # azure-communication-callautomation # azure-communication-sms # azure-monitor-opentelemetry-exporter multidict==6.1.0 \ @@ -1037,9 +1045,9 @@ oauthlib==3.2.2 \ --hash=sha256:8139f29aac13e25d502680e9e19963e83f16838d48a0d71c287fe40e7067fbca \ --hash=sha256:9859c40929662bec5d64f34d01c99e093149682a3f38915dc0655d5a633dd918 # via requests-oauthlib -openai==1.55.0 \ - --hash=sha256:446e08918f8dd70d8723274be860404c8c7cc46b91b93bbc0ef051f57eb503c1 \ - --hash=sha256:6c0975ac8540fe639d12b4ff5a8e0bf1424c844c4a4251148f59f06c4b2bd5db +openai==1.54.5 \ + --hash=sha256:2aab4f9755a3e1e04d8a45ac1f4ce7b6948bab76646020c6386256d7e5cbb7e0 \ + --hash=sha256:f55a4450f38501814b53e76311ed7845a6f7f35bab46d0fb2a3728035d7a72d8 # via call-center-ai (pyproject.toml) opentelemetry-api==1.28.2 \ --hash=sha256:6fcec89e265beb258fe6b1acaaa3c8c705a934bd977b9f534a2b7c0d2d4275a6 \ @@ -1508,6 +1516,10 @@ pydantic-settings==2.6.1 \ --hash=sha256:7fb0637c786a558d3103436278a7c4f1cfd29ba8973238a50c5bb9a55387da87 \ --hash=sha256:e0f92546d8a9923cb8941689abf85d6601a8c19a23e97a34b2964a2e3f813ca0 # via call-center-ai (pyproject.toml) +pydub==0.25.1 \ + --hash=sha256:65617e33033874b59d87db603aa1ed450633288aefead953b30bded59cb599a6 \ + --hash=sha256:980a33ce9949cab2a569606b65674d748ecbca4f0796887fd6f46173a7b0d30f + # via call-center-ai (pyproject.toml) pyjwt[crypto]==2.10.0 \ --hash=sha256:543b77207db656de204372350926bed5a86201c4cbff159f623f79c7bb487a15 \ --hash=sha256:7628a7eb7938959ac1b26e819a1df0fd3259505627b575e4bad6d08f76db695c diff --git a/tests/llm.py b/tests/llm.py index 93d52925..eeeab28d 100644 --- a/tests/llm.py +++ b/tests/llm.py @@ -274,9 +274,7 @@ async def _training_callback(_call: CallStateModel) -> None: await on_call_connected( call=call, client=automation_client, - post_callback=_post_callback, server_call_id="dummy", - training_callback=_training_callback, ) # First IVR @@ -284,8 +282,6 @@ async def _training_callback(_call: CallStateModel) -> None: call=call, client=automation_client, label=call.lang.short_code, - post_callback=_post_callback, - training_callback=_training_callback, ) # Simulate conversation with speech recognition diff --git a/tests/local.py b/tests/local.py index 29c5502a..521ed44d 100644 --- a/tests/local.py +++ b/tests/local.py @@ -56,9 +56,7 @@ async def _training_callback(_call: CallStateModel) -> None: await on_call_connected( call=call, client=automation_client, - post_callback=_post_callback, server_call_id="dummy", - training_callback=_training_callback, ) # First IVR @@ -66,8 +64,6 @@ async def _training_callback(_call: CallStateModel) -> None: call=call, client=automation_client, label=call.lang.short_code, - post_callback=_post_callback, - training_callback=_training_callback, ) # Simulate conversation