Context Reference

The Context object manages session state, configuration, and provides utilities for data management, background tasks, A/B testing, and outcome tracking.

Properties#

class Context: agent_id: str # Unique identifier for the agent environment: str # Environment name and ID (e.g., "production|a1b2c3d4-...") session_id: str # Unique session identifier function_id: str # Current function identifier variables: dict # Environment and organization variables data: dict # Custom session data store testing: Testing # Testing and outcomes manager

Usage:

from primfunctions.logger import logger async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Access session info logger.info(f"Agent: {context.agent_id}") logger.info(f"Environment: {context.environment}") logger.info(f"Session: {context.session_id}") # Access environment variables (API keys, config) api_key = context.variables.get("OPENAI_API_KEY")

Data Management#

Store and retrieve session-specific data that persists throughout the conversation.

set_data#

def set_data(self, key: str, value: Any)

Set the value of a data key.

get_data#

def get_data(self, key: str, default: Any = None) -> Any

Get the value of a data key, with optional default.

Usage:

# Store session data context.set_data("user_name", "John") context.set_data("order_items", ["coffee", "sandwich"]) context.set_data("order_total", 15.99) # Retrieve session data user_name = context.get_data("user_name", "Guest") items = context.get_data("order_items", [])

Completion Messages#

Manage conversation history for use with LLM completions via voicerun_completions.

get_completion_messages#

def get_completion_messages(self) -> list[dict[str, Any]]

Get completion messages as a list of dicts compatible with voicerun_completions.

add_completion_message#

def add_completion_message(self, message: Union[SerializableMessage, dict[str, Any]])

Add a message to the completion history. Accepts voicerun_completions message objects or dicts.

set_completion_messages#

def set_completion_messages(self, messages: list[Union[SerializableMessage, dict[str, Any]]])

Replace the entire completion message history.

Usage:

from voicerun_completions import ( generate_chat_completion, deserialize_conversation, UserMessage, ) async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") # Get existing messages and add new user message messages = deserialize_conversation(context.get_completion_messages()) messages.append(UserMessage(content=user_message)) # Generate response response = await generate_chat_completion({ "provider": "anthropic", "api_key": context.variables.get("ANTHROPIC_API_KEY"), "model": "claude-sonnet-4-20250514", "messages": messages, }) # Store updated conversation messages.append(response.message) context.set_completion_messages(messages) yield TextToSpeechEvent(text=response.message.content, voice="nova")

Cache#

Store and retrieve cached data that persists across the session.

cache_set#

def cache_set(self, key: str, value: Any)

Add a value to the cache. Supports CachableEntity objects, dicts, and primitives (str, int, float, bool, None).

cache_get#

def cache_get(self, key: str, entity_type: Optional[Type] = None) -> Optional[Any]

Get a value from the cache. If entity_type is provided, deserializes to that type.

Usage:

# Cache primitive values context.cache_set("last_query", "weather in NYC") context.cache_set("query_count", 5) # Retrieve cached values last_query = context.cache_get("last_query") count = context.cache_get("query_count")

Background Tasks#

Create and manage background tasks for long-running operations that shouldn't block the conversation.

create_task#

def create_task( self, handler: Coroutine, name: str = None, interruptible: bool = False, timeout: int = 30, )

Create a background task.

ParameterTypeDefaultDescription
handlerCoroutinerequiredAsync function or generator to run
namestrauto-generatedTask identifier
interruptibleboolFalseCan be cancelled by user interruption
timeoutint30Timeout in seconds

cancel_task#

def cancel_task(self, name: str)

Cancel a specific task by name.

cancel_interruptible_tasks#

def cancel_interruptible_tasks(self)

Cancel all tasks marked as interruptible.

cancel_all_tasks#

def cancel_all_tasks(self)

Cancel all background tasks.

has_unfinished_tasks#

def has_unfinished_tasks(self) -> bool

Check if any tasks are still running.

wait_for_all_tasks#

async def wait_for_all_tasks(self)

Wait for all tasks to complete.

Usage:

from primfunctions.logger import logger async def process_order(context: Context): """Background task that yields events.""" logger.info("Starting order processing...") # Simulate processing await asyncio.sleep(5) context.set_data("order_status", "completed") logger.info("Order processing complete") async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A").lower() if "place order" in user_message: # Start background task context.create_task( process_order(context), name="order_processing", timeout=60 ) yield TextToSpeechEvent( text="Processing your order in the background.", voice="nova" ) elif "check status" in user_message: status = context.get_data("order_status", "pending") yield TextToSpeechEvent( text=f"Your order status is: {status}", voice="nova" )

Tests (A/B Testing)#

Create randomized tests for experimenting with different conversation approaches.

add_test#

def add_test(self, name: str, options: dict, description: str = "", stop: dict = {}) -> str

Add a test and return its selected value.

ParameterTypeDescription
namestrTest identifier
optionsdictOption to weight mapping (e.g., {"Hello": 0.5, "Hi": 0.5})
descriptionstrOptional description
stopdictStop conditions (see below)

Stop conditions:

KeyTypeDescription
max_iterationsintStop after this many runs
max_confidenceintStop when confidence reaches this level (0-100)
target_outcomestrOutcome to measure confidence against
stop_onint1 = stop when ANY condition met, 2 = stop when BOTH met
defaultstrValue to use after test concludes
notifylist[str]Email addresses to notify when test concludes

get_test#

def get_test(self, name: str) -> str

Get the selected value for a test. Returns the same value throughout the session.

add_tests#

def add_tests(self, tests: list[dict])

Add multiple tests at once.

Usage:

async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Simple A/B test greeting = context.add_test( name="greeting_style", options={ "Hello! How can I help you today?": 0.5, "Hi there! What can I do for you?": 0.3, "Welcome! How may I assist you?": 0.2 } ) yield TextToSpeechEvent(text=greeting, voice="nova") if isinstance(event, TextEvent): # Get the same greeting value (consistent within session) greeting = context.get_test("greeting_style")

With stop conditions:

context.add_test( name="upsell_approach", options={"direct": 0.5, "subtle": 0.5}, stop={ "max_iterations": 1000, "max_confidence": 95, "target_outcome": "conversion_rate", "default": "direct", "stop_on": 2, "notify": ["analytics@company.com"] } )

Outcomes#

Track metrics and conversion events for analytics. Outcomes are linked to tests for A/B analysis.

add_outcome#

def add_outcome(self, name: str, type: str, description: str = "")

Add an outcome to track.

TypePython TypeDefaultUse Case
"boolean"boolFalseFlags, conversions
"integer"int0Counters
"float"float0.0Monetary values, scores
"string"str""Categories, selections

get_outcome#

def get_outcome(self, name: str) -> Any

Get the current value of an outcome.

set_outcome#

def set_outcome(self, name: str, value: Any) -> Any

Set the value of an outcome. Returns the previous value.

trigger_outcome#

def trigger_outcome(self, name: str) -> Any

Trigger an outcome. For booleans, sets to True. For integers, increments by 1.

reset_outcome#

def reset_outcome(self, name: str) -> Any

Reset an outcome to its default value. Returns the previous value.

add_outcomes#

def add_outcomes(self, outcomes: list[dict])

Add multiple outcomes at once.

Usage:

async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Define outcomes context.add_outcome("message_count", "integer", "Total messages") context.add_outcome("user_satisfied", "boolean", "User expressed satisfaction") context.add_outcome("order_value", "float", "Total order value") if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A").lower() # Increment counter context.trigger_outcome("message_count") # Set boolean if "thank you" in user_message or "great" in user_message: context.trigger_outcome("user_satisfied") # Set numeric value if "order" in user_message: context.set_outcome("order_value", 29.99) # Get current values count = context.get_outcome("message_count") yield TextToSpeechEvent( text=f"This is message number {count}.", voice="nova" )

Testing Metadata#

Store arbitrary metadata associated with the testing session.

set_testing_metadata#

def set_testing_metadata(self, key: str, value: Any)

Set a metadata value.

get_testing_metadata#

def get_testing_metadata(self, key: str, default: Any = None) -> Any

Get a metadata value.

Usage:

# Store metadata about the test session context.set_testing_metadata("segment", "returning_customer") context.set_testing_metadata("source", "phone") # Retrieve metadata segment = context.get_testing_metadata("segment")
contextstatesession