Context Reference
The Context object manages session state, configuration, and provides utilities for data management, background tasks, A/B testing, and outcome tracking.
Properties#
class Context: agent_id: str # Unique identifier for the agent environment: str # Environment name and ID (e.g., "production|a1b2c3d4-...") session_id: str # Unique session identifier function_id: str # Current function identifier variables: dict # Environment and organization variables data: dict # Custom session data store testing: Testing # Testing and outcomes manager
Usage:
from primfunctions.logger import logger async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Access session info logger.info(f"Agent: {context.agent_id}") logger.info(f"Environment: {context.environment}") logger.info(f"Session: {context.session_id}") # Access environment variables (API keys, config) api_key = context.variables.get("OPENAI_API_KEY")
Data Management#
Store and retrieve session-specific data that persists throughout the conversation.
set_data#
def set_data(self, key: str, value: Any)
Set the value of a data key.
get_data#
def get_data(self, key: str, default: Any = None) -> Any
Get the value of a data key, with optional default.
Usage:
# Store session data context.set_data("user_name", "John") context.set_data("order_items", ["coffee", "sandwich"]) context.set_data("order_total", 15.99) # Retrieve session data user_name = context.get_data("user_name", "Guest") items = context.get_data("order_items", [])
Completion Messages#
Manage conversation history for use with LLM completions via voicerun_completions.
get_completion_messages#
def get_completion_messages(self) -> list[dict[str, Any]]
Get completion messages as a list of dicts compatible with voicerun_completions.
add_completion_message#
def add_completion_message(self, message: Union[SerializableMessage, dict[str, Any]])
Add a message to the completion history. Accepts voicerun_completions message objects or dicts.
set_completion_messages#
def set_completion_messages(self, messages: list[Union[SerializableMessage, dict[str, Any]]])
Replace the entire completion message history.
Usage:
from voicerun_completions import ( generate_chat_completion, deserialize_conversation, UserMessage, ) async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") # Get existing messages and add new user message messages = deserialize_conversation(context.get_completion_messages()) messages.append(UserMessage(content=user_message)) # Generate response response = await generate_chat_completion({ "provider": "anthropic", "api_key": context.variables.get("ANTHROPIC_API_KEY"), "model": "claude-sonnet-4-20250514", "messages": messages, }) # Store updated conversation messages.append(response.message) context.set_completion_messages(messages) yield TextToSpeechEvent(text=response.message.content, voice="nova")
Cache#
Store and retrieve cached data that persists across the session.
cache_set#
def cache_set(self, key: str, value: Any)
Add a value to the cache. Supports CachableEntity objects, dicts, and primitives (str, int, float, bool, None).
cache_get#
def cache_get(self, key: str, entity_type: Optional[Type] = None) -> Optional[Any]
Get a value from the cache. If entity_type is provided, deserializes to that type.
Usage:
# Cache primitive values context.cache_set("last_query", "weather in NYC") context.cache_set("query_count", 5) # Retrieve cached values last_query = context.cache_get("last_query") count = context.cache_get("query_count")
Background Tasks#
Create and manage background tasks for long-running operations that shouldn't block the conversation.
create_task#
def create_task( self, handler: Coroutine, name: str = None, interruptible: bool = False, timeout: int = 30, )
Create a background task.
| Parameter | Type | Default | Description |
|---|---|---|---|
handler | Coroutine | required | Async function or generator to run |
name | str | auto-generated | Task identifier |
interruptible | bool | False | Can be cancelled by user interruption |
timeout | int | 30 | Timeout in seconds |
cancel_task#
def cancel_task(self, name: str)
Cancel a specific task by name.
cancel_interruptible_tasks#
def cancel_interruptible_tasks(self)
Cancel all tasks marked as interruptible.
cancel_all_tasks#
def cancel_all_tasks(self)
Cancel all background tasks.
has_unfinished_tasks#
def has_unfinished_tasks(self) -> bool
Check if any tasks are still running.
wait_for_all_tasks#
async def wait_for_all_tasks(self)
Wait for all tasks to complete.
Usage:
from primfunctions.logger import logger async def process_order(context: Context): """Background task that yields events.""" logger.info("Starting order processing...") # Simulate processing await asyncio.sleep(5) context.set_data("order_status", "completed") logger.info("Order processing complete") async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A").lower() if "place order" in user_message: # Start background task context.create_task( process_order(context), name="order_processing", timeout=60 ) yield TextToSpeechEvent( text="Processing your order in the background.", voice="nova" ) elif "check status" in user_message: status = context.get_data("order_status", "pending") yield TextToSpeechEvent( text=f"Your order status is: {status}", voice="nova" )
Tests (A/B Testing)#
Create randomized tests for experimenting with different conversation approaches.
add_test#
def add_test(self, name: str, options: dict, description: str = "", stop: dict = {}) -> str
Add a test and return its selected value.
| Parameter | Type | Description |
|---|---|---|
name | str | Test identifier |
options | dict | Option to weight mapping (e.g., {"Hello": 0.5, "Hi": 0.5}) |
description | str | Optional description |
stop | dict | Stop conditions (see below) |
Stop conditions:
| Key | Type | Description |
|---|---|---|
max_iterations | int | Stop after this many runs |
max_confidence | int | Stop when confidence reaches this level (0-100) |
target_outcome | str | Outcome to measure confidence against |
stop_on | int | 1 = stop when ANY condition met, 2 = stop when BOTH met |
default | str | Value to use after test concludes |
notify | list[str] | Email addresses to notify when test concludes |
get_test#
def get_test(self, name: str) -> str
Get the selected value for a test. Returns the same value throughout the session.
add_tests#
def add_tests(self, tests: list[dict])
Add multiple tests at once.
Usage:
async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Simple A/B test greeting = context.add_test( name="greeting_style", options={ "Hello! How can I help you today?": 0.5, "Hi there! What can I do for you?": 0.3, "Welcome! How may I assist you?": 0.2 } ) yield TextToSpeechEvent(text=greeting, voice="nova") if isinstance(event, TextEvent): # Get the same greeting value (consistent within session) greeting = context.get_test("greeting_style")
With stop conditions:
context.add_test( name="upsell_approach", options={"direct": 0.5, "subtle": 0.5}, stop={ "max_iterations": 1000, "max_confidence": 95, "target_outcome": "conversion_rate", "default": "direct", "stop_on": 2, "notify": ["analytics@company.com"] } )
Outcomes#
Track metrics and conversion events for analytics. Outcomes are linked to tests for A/B analysis.
add_outcome#
def add_outcome(self, name: str, type: str, description: str = "")
Add an outcome to track.
| Type | Python Type | Default | Use Case |
|---|---|---|---|
"boolean" | bool | False | Flags, conversions |
"integer" | int | 0 | Counters |
"float" | float | 0.0 | Monetary values, scores |
"string" | str | "" | Categories, selections |
get_outcome#
def get_outcome(self, name: str) -> Any
Get the current value of an outcome.
set_outcome#
def set_outcome(self, name: str, value: Any) -> Any
Set the value of an outcome. Returns the previous value.
trigger_outcome#
def trigger_outcome(self, name: str) -> Any
Trigger an outcome. For booleans, sets to True. For integers, increments by 1.
reset_outcome#
def reset_outcome(self, name: str) -> Any
Reset an outcome to its default value. Returns the previous value.
add_outcomes#
def add_outcomes(self, outcomes: list[dict])
Add multiple outcomes at once.
Usage:
async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Define outcomes context.add_outcome("message_count", "integer", "Total messages") context.add_outcome("user_satisfied", "boolean", "User expressed satisfaction") context.add_outcome("order_value", "float", "Total order value") if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A").lower() # Increment counter context.trigger_outcome("message_count") # Set boolean if "thank you" in user_message or "great" in user_message: context.trigger_outcome("user_satisfied") # Set numeric value if "order" in user_message: context.set_outcome("order_value", 29.99) # Get current values count = context.get_outcome("message_count") yield TextToSpeechEvent( text=f"This is message number {count}.", voice="nova" )
Testing Metadata#
Store arbitrary metadata associated with the testing session.
set_testing_metadata#
def set_testing_metadata(self, key: str, value: Any)
Set a metadata value.
get_testing_metadata#
def get_testing_metadata(self, key: str, default: Any = None) -> Any
Get a metadata value.
Usage:
# Store metadata about the test session context.set_testing_metadata("segment", "returning_customer") context.set_testing_metadata("source", "phone") # Retrieve metadata segment = context.get_testing_metadata("segment")
