synth_ai.sdk.environment_pools
Alpha
Environment Pools lets you run coding agents inside managed sandboxes with
SSE event streams, artifact capture, and usage reporting.
Environment Pools is available on Pro and Team plans. Requests from Free/Trial accounts will raise PlanGatingError.
Quick Start
from synth_ai.sdk.environment_pools import (
EnvironmentPoolsClient,
AgentSpec,
PoolTask,
)
client = EnvironmentPoolsClient(api_key="sk_...")
# Create a pool
pool = client.pools.create(request={
"pool_id": "my-pool",
"pool_type": "openenv",
"concurrency": 4,
})
# Add a task
task = PoolTask.from_openenv("task-1", "https://my-task-app.example.com")
client.pools.tasks.create(pool["pool_id"], request=task)
# Launch a rollout with AgentSpec preset
handle = client.rollouts.create({
"task_ref": {"dataset": "my-dataset", "task_id": "task-1"},
"agent": AgentSpec.claude_code().model_dump(exclude_none=True),
"task_app_url": "https://my-task-app.example.com",
"timeouts": {"agent_sec": 300},
"pool_id": "my-pool",
})
# Stream events
for event in handle.events():
print(event)
# Get usage
print(handle.usage())
Key Types
EnvironmentPoolsClient
High-level client for creating pools, launching rollouts, and consuming events.
Sub-namespaces:
client.rollouts — create, get, list, events, cancel, artifacts, usage
client.pools — list, get, create, update, delete + client.pools.tasks.*
client.credentials — store, list, delete, rotate
client.capabilities — get, openapi_schema, schema_json
AgentSpec
Defines the agent harness and model. Use preset helpers to avoid invalid combinations:
# Preset methods (recommended)
AgentSpec.claude_code() # claude-sonnet-4.5
AgentSpec.opencode() # claude-sonnet-4.5
AgentSpec.codex() # gpt-5.2-codex
# With explicit model
AgentSpec.codex(model_id="gpt-5.1-codex-mini")
# Generic preset
AgentSpec.preset("claude-code", model_id="claude-sonnet-4.5")
Allowed models:
- claude-code:
claude-sonnet-4.5
- opencode:
claude-sonnet-4.5
- codex:
gpt-5.2-codex, gpt-5.1-codex-mini
Invalid model_id raises ValueError with allowed models.
RolloutHandle
Returned by client.rollouts.create(...). Provides convenient methods:
| Method | Description |
|---|
.get() | Fetch current rollout status |
.summary() | Get rollout summary |
.wait() | Block until terminal state |
.events() | Stream SSE events (auto-reconnect by default) |
.artifacts() | List artifacts |
.download_zip() | Download all artifacts as zip |
.download(path) | Download single artifact |
.support_bundle() | Export debug bundle |
.replay() | Replay rollout |
.cancel() | Cancel running rollout |
.usage() | Get usage/cost snapshot |
PoolTask
Factory helpers for creating pool task instances:
# Docker/Harbor task
PoolTask.from_docker("task-1", "my-registry/my-image:latest", env={"FOO": "bar"})
# OpenEnv task
PoolTask.from_openenv("task-1", "https://my-task-app.example.com")
# Browser task
PoolTask.from_browser("task-1", "https://my-task-app.example.com", profile="linkedin")
RolloutRequestV1
Defines a rollout request. Required fields:
task_ref (dataset + task_id)
agent (harness + model)
task_app_url
timeouts
- One of:
pool_id, pool_tags, or environment
PoolConfigV1
Configuration for managed pools and capacity.
EventEnvelope
Standard event wrapper for SSE streams with fields:
id (cursor / Last-Event-ID)
event (type: lifecycle, agent, verifier, reward, heartbeat)
seq (monotonic sequence)
data (payload)
ArtifactManifest
Server-side metadata for artifacts produced by a rollout.
UsageSnapshot
Rollout usage data: wall time, sandbox minutes, LLM tokens, data transfer, estimated cost.
PlanGatingError
Raised when the account plan does not support Environment Pools.
Core Functions
create_rollout
create_rollout(
*,
backend_base: str | None = None,
api_key: str | None = None,
request: dict | RolloutRequestV1,
idempotency_key: str | None = None,
dry_run: bool | None = None,
timeout: float = 120.0,
) -> dict[str, Any]
get_rollout
get_rollout(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
cancel_rollout
cancel_rollout(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
stream_rollout_events
stream_rollout_events(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
cursor: str | None = None,
since: str | None = None,
limit: int | None = None,
auto_reconnect: bool = False,
max_retries: int = 5,
backoff_base: float = 1.0,
backoff_max: float = 30.0,
) -> Iterator[dict[str, Any]]
get_rollout_usage
get_rollout_usage(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
get_rollout_support_bundle
get_rollout_support_bundle(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
list_rollout_artifacts
list_rollout_artifacts(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
prefix: str | None = None,
cursor: str | None = None,
limit: int | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
fetch_artifact
fetch_artifact(
rollout_id: str,
artifact_path: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 60.0,
) -> bytes
download_artifacts_zip
download_artifacts_zip(
rollout_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 120.0,
) -> bytes
create_pool
create_pool(
*,
backend_base: str | None = None,
api_key: str | None = None,
request: dict[str, Any],
timeout: float = 30.0,
) -> dict[str, Any]
list_pools
list_pools(
*,
backend_base: str | None = None,
api_key: str | None = None,
pool_type: str | None = None,
tag: str | None = None,
timeout: float = 30.0,
) -> list[dict[str, Any]]
get_pool
get_pool(
pool_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
update_pool
update_pool(
pool_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
request: dict[str, Any],
timeout: float = 30.0,
) -> dict[str, Any]
delete_pool
delete_pool(
pool_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> None
create_pool_task
create_pool_task(
pool_id: str,
*,
request: dict[str, Any] | PoolTaskInstance,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
list_pool_tasks
list_pool_tasks(
pool_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> list[dict[str, Any]]
update_pool_task
update_pool_task(
pool_id: str,
task_id: str,
*,
request: dict[str, Any] | PoolTaskInstance,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
delete_pool_task
delete_pool_task(
pool_id: str,
task_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> None
store_credential
store_credential(
*,
credential_name: str,
credential_type: str,
credential_value: str | dict[str, Any],
metadata: dict[str, Any] | None = None,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
list_credentials
list_credentials(
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> list[dict[str, Any]]
rotate_credential
rotate_credential(
credential_id: str,
*,
new_value: str | dict[str, Any],
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
delete_credential
delete_credential(
credential_id: str,
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> None
get_capabilities
get_capabilities(
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
Returns supported agents, models, and backends.
get_openapi_schema
get_openapi_schema(
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]
get_schema_json
get_schema_json(
*,
backend_base: str | None = None,
api_key: str | None = None,
timeout: float = 30.0,
) -> dict[str, Any]