Context Reference
The Context object manages session state, configuration, and provides utilities for data management, background tasks, A/B testing, and outcome tracking.
Properties#
class Context: agent_id: str # Unique identifier for the agent environment: str # Environment name and ID (e.g., "production|a1b2c3d4-...") session_id: str # Unique session identifier function_id: str # Current function identifier variables: dict # Environment and organization variables data: dict # Custom session data store testing: Testing # Testing and outcomes manager
Usage:
from primfunctions.logger import logger async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Access session info logger.info(f"Agent: {context.agent_id}") logger.info(f"Environment: {context.environment}") logger.info(f"Session: {context.session_id}") # Access environment variables (API keys, config) api_key = context.variables.get("OPENAI_API_KEY")
Variables#
context.variables is a plain dict[str, str] populated when the agent session starts. It holds configuration values — API keys, endpoints, feature flags — that you can read from anywhere inside your handler with standard dict access.
api_key = context.variables.get("ANTHROPIC_API_KEY") region = context.variables.get("AWS_REGION", "us-east-1") # with default
Where variables come from#
Variables are merged from two scopes when the session starts:
| Scope | Entity | Visibility |
|---|---|---|
| Organization | OrganizationVariable | Every agent in your organization |
| Environment | AgentEnvironmentVariable | Only sessions running in that specific agent environment (e.g. production vs staging) |
If a variable with the same name exists at both scopes, the environment-scoped value wins.
Keys are case-sensitive. context.variables.get("API_KEY") will not return a variable created as api_key.
Creating variables#
Use the VoiceRun CLI to create variables:
# Organization-level (every agent can read it) vr create variable SUPPORT_EMAIL help@acme.com --org # Environment-scoped, visible only to sessions in the "production" environment vr create variable ANTHROPIC_API_KEY sk-ant-... \ --agent my-agent --environment production --masked
Pass --masked for sensitive values like API keys. Masked values are still delivered to the agent runtime (so context.variables.get() returns the real value) but are redacted in CLI listings, vr describe variable, and the web UI.
Inside a voicerun project, the --agent flag defaults to the agent in .voicerun/agent.lock, so you can usually omit it:
vr create variable ANTHROPIC_API_KEY sk-ant-... --environment production --masked
Variables vs. secrets#
VoiceRun also exposes a separate concept called "secrets" (vr create secret, OrganizationSecret). Secrets are not currently injected into context.variables — they're used only by evaluators running during test and evaluation workflows. Runtime injection of agent-scoped secrets is on the roadmap but not yet supported.
If you want a value readable via context.variables.get() in your handler, create a variable (with --masked if sensitive), not a secret.
Data Management#
Store and retrieve session-specific data that persists throughout the conversation.
set_data#
def set_data(self, key: str, value: Any)
Set the value of a data key.
get_data#
def get_data(self, key: str, default: Any = None) -> Any
Get the value of a data key, with optional default.
Usage:
# Store session data context.set_data("user_name", "John") context.set_data("order_items", ["coffee", "sandwich"]) context.set_data("order_total", 15.99) # Retrieve session data user_name = context.get_data("user_name", "Guest") items = context.get_data("order_items", [])
Subagent Input & Output#
Agents can be invoked as subagents: the caller hands the platform a task spec when the session is created, the handler runs autonomously, and a structured result is persisted back onto the session at session end. Use this pattern when an outbound call (or any pre-created session) needs to return a structured outcome to whatever kicked it off — for example, "did the booking get confirmed, and what's the confirmation number?"
The two surfaces involved:
| Field | Direction | When written | When read |
|---|---|---|---|
context.input_data | Caller → handler | At session creation (/v1/entrypoints/:id/start or /outbound) | Anywhere in the handler |
context.output_data | Handler → caller | Anywhere in the handler via set_output | Persisted once at session end |
Both fields are JSON objects capped at 64KB. Top-level value must be a dict.
input_data#
context.input_data: dict
Read-only snapshot of the JSON the caller passed in when the session was created. Always a dict — empty when no payload was provided.
set_output#
def set_output(self, payload: dict) -> None
Replace the output payload. Raises TypeError if payload is not a dict and ValueError if it's not JSON-serializable or exceeds the 64KB cap.
To build the output incrementally across turns, spread the current value into the new payload:
context.set_output({**context.output_data, "field": value})
Usage:
from primfunctions.events import StartEvent, StopEvent, TextEvent, TextToSpeechEvent from primfunctions.logger import logger async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Read the task spec the caller provided. task = context.input_data logger.info(f"Running outbound task: {task.get('objective')}") yield TextToSpeechEvent(text="Hi, I'm calling about your booking.", voice="nova") if isinstance(event, TextEvent): # ... drive the conversation, extracting fields as you go ... if confirmed_a_thing: context.set_output({ **context.output_data, "status": "confirmed", "confirmation_number": "ABC123", }) if isinstance(event, StopEvent): # output_data has already been built up; the platform persists it # onto the session at session end. No explicit "send" is needed. logger.info(f"Final output: {context.output_data}")
The caller picks up the result either by polling vr session info for the outputData field, by passing --wait to vr outbound call, or — once the session-end webhook is extended — by receiving it as part of the webhook payload.
Persistence semantics: the platform reads
context.output_dataonce, at session end, and writes it onto the session record. Multipleset_outputcalls during the call are fine; only the final value is persisted.
Size cap: 64KB enforced eagerly at the SDK and again at the API. Use it for structured results, not transcripts (those have their own field).
Completion Messages#
Manage conversation history for use with LLM completions via VoiceRun Completions (primfunctions.completions).
get_completion_messages#
def get_completion_messages(self) -> list[dict[str, Any]]
Return the stored conversation history as a list of dicts. Use primfunctions.completions.deserialize_conversation to convert to typed message objects.
add_completion_message#
def add_completion_message(self, message: Union[SerializableMessage, dict[str, Any]])
Append a message to the history. Accepts any primfunctions.completions message dataclass (UserMessage, AssistantMessage, SystemMessage, ToolResultMessage) or a plain dict.
set_completion_messages#
def set_completion_messages(self, messages: list[Union[SerializableMessage, dict[str, Any]]])
Replace the entire history. The list may mix typed messages and dicts.
Usage:
from primfunctions.completions import ( UserMessage, configure_provider, deserialize_conversation, generate_chat_completion, ) async def handler(event: Event, context: Context): if isinstance(event, StartEvent): configure_provider("google", voicerun_managed=True) if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A") # Get existing messages and add new user message messages = deserialize_conversation(context.get_completion_messages()) messages.append(UserMessage(content=user_message)) # Generate response response = await generate_chat_completion({ "provider": "google", "model": "gemini-3.5-flash", "messages": messages, }) # Store updated conversation messages.append(response.message) context.set_completion_messages(messages) yield TextToSpeechEvent(text=response.message.content, voice="nova")
Cache#
Store and retrieve cached data that persists across the session.
cache_set#
def cache_set(self, key: str, value: Any)
Add a value to the cache. Supports CachableEntity objects, dicts, and primitives (str, int, float, bool, None).
cache_get#
def cache_get(self, key: str, entity_type: Optional[Type] = None) -> Optional[Any]
Get a value from the cache. If entity_type is provided, deserializes to that type.
Usage:
# Cache primitive values context.cache_set("last_query", "weather in NYC") context.cache_set("query_count", 5) # Retrieve cached values last_query = context.cache_get("last_query") count = context.cache_get("query_count")
Background Tasks#
Create and manage background tasks for long-running operations that shouldn't block the conversation.
create_task#
def create_task( self, handler: Coroutine, name: str = None, interruptible: bool = False, timeout: int = 30, )
Create a background task.
| Parameter | Type | Default | Description |
|---|---|---|---|
handler | Coroutine | required | Async function or generator to run |
name | str | auto-generated | Task identifier |
interruptible | bool | False | Can be cancelled by user interruption |
timeout | int | 30 | Timeout in seconds |
cancel_task#
def cancel_task(self, name: str)
Cancel a specific task by name.
cancel_interruptible_tasks#
def cancel_interruptible_tasks(self)
Cancel all tasks marked as interruptible.
cancel_all_tasks#
def cancel_all_tasks(self)
Cancel all background tasks.
has_unfinished_tasks#
def has_unfinished_tasks(self) -> bool
Check if any tasks are still running.
wait_for_all_tasks#
async def wait_for_all_tasks(self)
Wait for all tasks to complete.
Usage:
from primfunctions.logger import logger async def process_order(context: Context): """Background task that yields events.""" logger.info("Starting order processing...") # Simulate processing await asyncio.sleep(5) context.set_data("order_status", "completed") logger.info("Order processing complete") async def handler(event: Event, context: Context): if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A").lower() if "place order" in user_message: # Start background task context.create_task( process_order(context), name="order_processing", timeout=60 ) yield TextToSpeechEvent( text="Processing your order in the background.", voice="nova" ) elif "check status" in user_message: status = context.get_data("order_status", "pending") yield TextToSpeechEvent( text=f"Your order status is: {status}", voice="nova" )
Tests (A/B Testing)#
Create randomized tests for experimenting with different conversation approaches.
add_test#
def add_test(self, name: str, options: dict, description: str = "", stop: dict = {}) -> str
Add a test and return its selected value.
| Parameter | Type | Description |
|---|---|---|
name | str | Test identifier |
options | dict | Option to weight mapping (e.g., {"Hello": 0.5, "Hi": 0.5}) |
description | str | Optional description |
stop | dict | Stop conditions (see below) |
Stop conditions:
| Key | Type | Description |
|---|---|---|
max_iterations | int | Stop after this many runs |
max_confidence | int | Stop when confidence reaches this level (0-100) |
target_outcome | str | Outcome to measure confidence against |
stop_on | int | 1 = stop when ANY condition met, 2 = stop when BOTH met |
default | str | Value to use after test concludes |
notify | list[str] | Email addresses to notify when test concludes |
get_test#
def get_test(self, name: str) -> str
Get the selected value for a test. Returns the same value throughout the session.
add_tests#
def add_tests(self, tests: list[dict])
Add multiple tests at once.
Usage:
async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Simple A/B test greeting = context.add_test( name="greeting_style", options={ "Hello! How can I help you today?": 0.5, "Hi there! What can I do for you?": 0.3, "Welcome! How may I assist you?": 0.2 } ) yield TextToSpeechEvent(text=greeting, voice="nova") if isinstance(event, TextEvent): # Get the same greeting value (consistent within session) greeting = context.get_test("greeting_style")
With stop conditions:
context.add_test( name="upsell_approach", options={"direct": 0.5, "subtle": 0.5}, stop={ "max_iterations": 1000, "max_confidence": 95, "target_outcome": "conversion_rate", "default": "direct", "stop_on": 2, "notify": ["analytics@company.com"] } )
Outcomes#
Track metrics and conversion events for analytics. Outcomes are linked to tests for A/B analysis.
add_outcome#
def add_outcome(self, name: str, type: str, description: str = "")
Add an outcome to track.
| Type | Python Type | Default | Use Case |
|---|---|---|---|
"boolean" | bool | False | Flags, conversions |
"integer" | int | 0 | Counters |
"float" | float | 0.0 | Monetary values, scores |
"string" | str | "" | Categories, selections |
get_outcome#
def get_outcome(self, name: str) -> Any
Get the current value of an outcome.
set_outcome#
def set_outcome(self, name: str, value: Any) -> Any
Set the value of an outcome. Returns the previous value.
trigger_outcome#
def trigger_outcome(self, name: str) -> Any
Trigger an outcome. For booleans, sets to True. For integers, increments by 1.
reset_outcome#
def reset_outcome(self, name: str) -> Any
Reset an outcome to its default value. Returns the previous value.
add_outcomes#
def add_outcomes(self, outcomes: list[dict])
Add multiple outcomes at once.
Usage:
async def handler(event: Event, context: Context): if isinstance(event, StartEvent): # Define outcomes context.add_outcome("message_count", "integer", "Total messages") context.add_outcome("user_satisfied", "boolean", "User expressed satisfaction") context.add_outcome("order_value", "float", "Total order value") if isinstance(event, TextEvent): user_message = event.data.get("text", "N/A").lower() # Increment counter context.trigger_outcome("message_count") # Set boolean if "thank you" in user_message or "great" in user_message: context.trigger_outcome("user_satisfied") # Set numeric value if "order" in user_message: context.set_outcome("order_value", 29.99) # Get current values count = context.get_outcome("message_count") yield TextToSpeechEvent( text=f"This is message number {count}.", voice="nova" )
Testing Metadata#
Store arbitrary metadata associated with the testing session.
set_testing_metadata#
def set_testing_metadata(self, key: str, value: Any)
Set a metadata value.
get_testing_metadata#
def get_testing_metadata(self, key: str, default: Any = None) -> Any
Get a metadata value.
Usage:
# Store metadata about the test session context.set_testing_metadata("segment", "returning_customer") context.set_testing_metadata("source", "phone") # Retrieve metadata segment = context.get_testing_metadata("segment")
