Examples

Multi-turn conversation with tool calls, retries, and fallbacks#

Non-streaming. Primary is Anthropic; the handler falls back to OpenAI or Google if Anthropic is down. Every provider used (including the fallbacks) is registered in StartEvent.

from primfunctions.events import Event, StartEvent, TextEvent, TextToSpeechEvent from primfunctions.context import Context from primfunctions.completions import ( ChatCompletionRequest, CompletionsProvider, ConversationHistory, FallbackRequest, FunctionDefinition, RetryConfiguration, ToolDefinition, ToolResultMessage, UserMessage, configure_provider, deserialize_conversation, generate_chat_completion, ) async def get_weather(location: str) -> dict: # Replace with your real weather backend. return {"temperature": 72, "condition": "sunny", "location": location} async def execute_tool(name: str, arguments: dict) -> dict: if name == "get_weather": return await get_weather(arguments["location"]) raise ValueError(f"Unknown tool: {name}") async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Register every provider the handler might touch this session. configure_provider("anthropic", voicerun_managed=True) configure_provider("openai", voicerun_managed=True) configure_provider("google", voicerun_managed=True) yield TextToSpeechEvent( text="I can check the weather for multiple locations.", voice="kore", ) if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") messages: ConversationHistory = deserialize_conversation( context.get_completion_messages() ) messages.append(UserMessage(content=user_message)) tools = [ ToolDefinition( type="function", function=FunctionDefinition( name="get_weather", description="Get weather for a location", parameters={ "type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"], }, ), ), ] request = ChatCompletionRequest( provider=CompletionsProvider.ANTHROPIC, model="claude-haiku-4-5", messages=messages, tools=tools, tool_choice="auto", temperature=0.7, max_tokens=500, timeout=30.0, retry=RetryConfiguration( enabled=True, max_retries=3, retry_delay=1.0, backoff_multiplier=2.0, ), fallbacks=[ FallbackRequest( provider=CompletionsProvider.OPENAI, model="gpt-4.1-mini", ), FallbackRequest( provider=CompletionsProvider.GOOGLE, model="gemini-2.5-flash", ), ], ) # First turn — model may emit tool calls response = await generate_chat_completion(request) messages.append(response.message) if response.message.tool_calls: # Execute each tool call and feed results back for tool_call in response.message.tool_calls: result = await execute_tool( tool_call.function.name, tool_call.function.arguments, ) messages.append(ToolResultMessage( tool_call_id=tool_call.id, name=tool_call.function.name, content=result, )) # Second turn — model uses the tool results request.messages = messages response = await generate_chat_completion(request) messages.append(response.message) context.set_completion_messages(messages) if response.message.content: yield TextToSpeechEvent(text=response.message.content, voice="kore")

Streaming multi-turn with tool calls, retries, and fallbacks#

Streaming. Same architecture, but sentences are emitted to TTS as they arrive. The final response chunk carries the assembled ChatCompletionResponse for history persistence.

from primfunctions.events import Event, StartEvent, TextEvent, TextToSpeechEvent from primfunctions.context import Context from primfunctions.completions import ( ConversationHistory, ToolResultMessage, UserMessage, configure_provider, deserialize_conversation, generate_chat_completion_stream, ) async def get_weather(location: str) -> dict: return {"temperature": 72, "condition": "sunny", "location": location} async def execute_tool(name: str, arguments: dict) -> dict: if name == "get_weather": return await get_weather(arguments["location"]) raise ValueError(f"Unknown tool: {name}") async def handler(event: Event, context: Context): if isinstance(event, StartEvent): configure_provider("anthropic", voicerun_managed=True) configure_provider("openai", voicerun_managed=True) configure_provider("google", voicerun_managed=True) yield TextToSpeechEvent( text="I can check the weather for multiple locations.", voice="kore", ) if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") messages: ConversationHistory = deserialize_conversation( context.get_completion_messages() ) messages.append(UserMessage(content=user_message)) tools = [ { "type": "function", "function": { "name": "get_weather", "description": "Get weather for a location", "parameters": { "type": "object", "properties": {"location": {"type": "string"}}, "required": ["location"], }, }, }, ] request = { "provider": "anthropic", "model": "claude-haiku-4-5", "messages": messages, "tools": tools, "tool_choice": "auto", "temperature": 0.7, "max_tokens": 500, "timeout": 30.0, "retry": { "max_retries": 3, "retry_delay": 1.0, "backoff_multiplier": 2.0, }, "fallbacks": [ {"provider": "openai", "model": "gpt-4.1-mini"}, {"provider": "google", "model": "gemini-2.5-flash"}, ], } # First turn — stream; capture tool calls as they arrive stream = await generate_chat_completion_stream( request=request, stream_options={"chunk_by_sentence": True, "clean_sentences": True}, ) tool_calls = [] async for chunk in stream: if chunk.type == "content_sentence": yield TextToSpeechEvent(text=chunk.sentence, voice="kore") elif chunk.type == "tool_call": tool_calls.append(chunk.tool_call) elif chunk.type == "response": messages.append(chunk.response.message) # Execute tools and re-call the model with results if tool_calls: for tool_call in tool_calls: result = await execute_tool( tool_call.function.name, tool_call.function.arguments, ) messages.append(ToolResultMessage( tool_call_id=tool_call.id, name=tool_call.function.name, content=result, )) request["messages"] = messages stream = await generate_chat_completion_stream( request=request, stream_options={"chunk_by_sentence": True, "clean_sentences": True}, ) async for chunk in stream: if chunk.type == "content_sentence": yield TextToSpeechEvent(text=chunk.sentence, voice="kore") elif chunk.type == "response": messages.append(chunk.response.message) context.set_completion_messages(messages)

Next steps#

examplestutorials