Streaming

The library supports both token-based and sentence-based streaming for real-time applications.

Basic Streaming#

from primfunctions.events import Event, StartEvent, TextEvent, TextToSpeechEvent from primfunctions.context import Context from voicerun_completions import generate_chat_completion_stream async def handler(event: Event, context: Context): if isinstance(event, StartEvent): yield TextToSpeechEvent( text="I'll tell you a story, sentence by sentence.", voice="kore" ) if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") stream = await generate_chat_completion_stream( request={ "provider": "anthropic", "api_key": context.variables.get("ANTHROPIC_API_KEY"), "model": "claude-haiku-4-5", "messages": [{"role": "user", "content": user_message}] }, stream_options={ "stream_sentences": True, # Stream by sentences "clean_sentences": True, # Remove markdown for TTS "min_sentence_length": 6, # Minimum sentence length "punctuation_language": "en" # Language-specific punctuation } ) async for chunk in stream: if chunk.type == "content_sentence": yield TextToSpeechEvent( text=chunk.sentence, voice="kore" ) elif chunk.type == "response": # Full ChatCompletionResponse assembled from stream complete_response = chunk.response

Streaming Chunk Types#

The library yields different chunk types during streaming. See Streaming Types for complete type definitions.

from voicerun_completions import ( AssistantMessageSentenceChunk, # Complete sentences (stream_sentences=True) AssistantMessageDeltaChunk, # Incremental text tokens ToolCallChunk, # Complete tool calls FinishReasonChunk, # Why generation stopped UsageChunk, # Token usage statistics FinalResponseChunk, # Complete assembled response ) async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") stream = await generate_chat_completion_stream( request={ "provider": "anthropic", "api_key": context.variables.get("ANTHROPIC_API_KEY"), "model": "claude-haiku-4-5", "messages": [{"role": "user", "content": user_message}] }, stream_options={"stream_sentences": True, "clean_sentences": True} ) async for chunk in stream: if chunk.type == "content_sentence": # Complete sentence ready for TTS yield TextToSpeechEvent( text=chunk.sentence, voice="kore" ) elif chunk.type == "tool_call": # Function call to execute pass elif chunk.type == "finish_reason": # Generation finished pass elif chunk.type == "usage": # Token usage statistics pass elif chunk.type == "response": # Full ChatCompletionResponse assembled from stream complete_response = chunk.response # complete_response.message, complete_response.finish_reason, complete_response.usage

Streaming Retry Behavior#

Important: Streaming retries only occur for initial connection failures. Once streaming begins, failures are not retried to prevent duplicate content:

# ✅ RETRIES (Before streaming starts): # - Connection failures # - Network errors # - Rate limits (429) # - Server errors (500, 502, 503) # - Authentication errors (401) # - Timeout errors # ❌ NO RETRY (After streaming starts): # - Mid-stream failures raise exceptions immediately # - This prevents duplicate content in real-time applications

Handling Streaming Errors#

Since mid-stream failures are not retried, wrap streaming in error handling for production use:

from primfunctions.events import Event, TextEvent, TextToSpeechEvent, ErrorEvent from primfunctions.context import Context from voicerun_completions import generate_chat_completion_stream async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") try: stream = await generate_chat_completion_stream( request={ "provider": "anthropic", "api_key": context.variables.get("ANTHROPIC_API_KEY"), "model": "claude-haiku-4-5", "messages": [{"role": "user", "content": user_message}] }, stream_options={"stream_sentences": True, "clean_sentences": True} ) async for chunk in stream: if chunk.type == "content_sentence": yield TextToSpeechEvent( text=chunk.sentence, voice="kore" ) elif chunk.type == "response": complete_response = chunk.response except Exception as e: yield ErrorEvent(message=f"Stream error: {e}") yield TextToSpeechEvent( text="I'm sorry, there was an error. Please try again.", voice="kore" )

Stream Options#

StreamOptions#

Options for configuring streaming behavior:

  • stream_sentences (bool): Stream by sentences instead of tokens
  • clean_sentences (bool): Clean markdown for TTS (default: True)
  • min_sentence_length (int): Minimum sentence length (default: 6)
  • punctuation_marks (Optional[List[str]]): Custom punctuation marks
  • punctuation_language (Optional[str]): Language code for punctuation
    • Supported: 'en', 'zh', 'ko', 'ja', 'es', 'fr', 'it', 'de'

Next Steps#

streamingreal-timeasync