Streaming

generate_chat_completion_stream returns an async iterable of chunks as the provider produces them. The proxy assembles sentence boundaries on the server so you can emit TTS events as whole sentences arrive.

Basic streaming#

from primfunctions.events import Event, StartEvent, TextEvent, TextToSpeechEvent from primfunctions.context import Context from primfunctions.completions import ( configure_provider, generate_chat_completion_stream, ) async def handler(event: Event, context: Context): if isinstance(event, StartEvent): configure_provider("anthropic", voicerun_managed=True) yield TextToSpeechEvent( text="I'll tell you a story, sentence by sentence.", voice="kore", ) if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") stream = await generate_chat_completion_stream( request={ "provider": "anthropic", "model": "claude-haiku-4-5", "messages": [{"role": "user", "content": user_message}], }, stream_options={ "chunk_by_sentence": True, # assemble sentences server-side "clean_sentences": True, # strip markdown before emitting "min_sentence_length": 6, "punctuation_language": "en", }, ) async for chunk in stream: if chunk.type == "content_sentence": yield TextToSpeechEvent(text=chunk.sentence, voice="kore") elif chunk.type == "response": # The final chunk carries the fully assembled response. complete_response = chunk.response

Chunk types#

Each chunk is a dataclass with a .type string and type-specific fields. The proxy's emission order for a typical streaming turn:

content_delta*   — or content_sentence*, depending on stream_options
tool_call*       — if the model called any tools
finish_reason
usage
response         — final assembled ChatCompletionResponse

Import and match by type:

from primfunctions.completions import ( ContentDeltaChunk, # token-level deltas (chunk_by_sentence=False) ContentSentenceChunk, # complete sentences (chunk_by_sentence=True) ToolCallChunk, # a reassembled tool call FinishReasonChunk, # why the model stopped UsageChunk, # token usage FinalResponseChunk, # full ChatCompletionResponse ErrorChunk, # mid-stream error from the proxy ) async for chunk in stream: match chunk.type: case "content_sentence": yield TextToSpeechEvent(text=chunk.sentence, voice="kore") case "tool_call": # chunk.tool_call: ToolCall with id, function.name, function.arguments pass case "finish_reason": # chunk.finish_reason: "stop" | "length" | "tool_calls" | ... pass case "usage": # chunk.usage: dict with token counts pass case "response": # chunk.response: full ChatCompletionResponse assembled by the proxy final = chunk.response

See API Reference → Streaming types for each dataclass's fields.

Token vs sentence streaming#

chunk_by_sentenceEmitsUse when
False (default)ContentDeltaChunk per tokenYou're rendering text to a screen and want minimal latency
TrueContentSentenceChunk per sentenceYou're piping to TTS — sentence boundaries are the natural TTS unit

For voice agents, chunk_by_sentence: True is almost always what you want. The proxy handles language-specific punctuation rules so the sentences you receive are speakable.

Retry behavior during streaming#

Retries fire before the stream has started producing content. Once any chunk has been yielded, the library does not retry — that would duplicate output to the user.

  • Retried (before first chunk): connection errors, timeouts, 429/5xx from the provider, auth errors
  • Not retried (after first chunk): mid-stream disconnects surface as exceptions on async for

If you have fallbacks configured and the initial connection to the primary provider fails, the proxy tries each fallback in order until one succeeds. Once streaming has begun, the current provider carries the turn to completion.

Handling errors#

Wrap the iteration in try so mid-stream failures don't kill the handler:

from primfunctions.completions import ( CompletionsProxyError, configure_provider, generate_chat_completion_stream, ) async def handler(event, context): if isinstance(event, StartEvent): configure_provider("anthropic", voicerun_managed=True) if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") try: stream = await generate_chat_completion_stream( request={ "provider": "anthropic", "model": "claude-haiku-4-5", "messages": [{"role": "user", "content": user_message}], }, stream_options={"chunk_by_sentence": True, "clean_sentences": True}, ) async for chunk in stream: if chunk.type == "content_sentence": yield TextToSpeechEvent(text=chunk.sentence, voice="kore") except CompletionsProxyError as e: yield LogEvent(f"proxy error: {e}") yield TextToSpeechEvent( text="Sorry, something went wrong. Please try again.", voice="kore", )

CompletionsProxyError carries .message, .error_type, and (for non-streaming) .status_code. Mid-stream errors that the proxy emits as an ErrorChunk on the wire are re-raised inside async for as CompletionsProxyError.

StreamOptions reference#

FieldTypeDefaultMeaning
chunk_by_sentenceboolFalseEmit ContentSentenceChunk on sentence boundaries instead of ContentDeltaChunk per token
clean_sentencesboolTrueStrip markdown formatting from sentences before emitting (makes TTS cleaner)
min_sentence_lengthint6Merge shorter fragments into the next sentence
punctuation_marksOptional[list[str]]NoneOverride the default punctuation set
punctuation_languageOptional[str]NoneLanguage code controlling sentence-end detection — en, zh, ko, ja, es, fr, it, de

Deprecated: stream_sentences is the previous name for chunk_by_sentence. It still works for backward compatibility but emits a deprecation warning via primfunctions.logger. Update existing handlers to chunk_by_sentence.

Pass as a dict or as a StreamOptions dataclass:

from primfunctions.completions import StreamOptions stream = await generate_chat_completion_stream( request={...}, stream_options=StreamOptions( chunk_by_sentence=True, clean_sentences=True, punctuation_language="en", ), )

Next steps#

streamingreal-timeasync