import os
import time
from typing import Any
from synth_ai.data.artifacts import Artifact
from synth_ai.data.enums import SuccessStatus
from synth_ai.sdk.environment_pools import AgentSpec, create_rollout, get_rollout
from synth_ai.sdk.localapi import LocalAPIConfig, create_local_api
from synth_ai.sdk.localapi._impl.contracts import RolloutRequest, RolloutResponse, TaskInfo
from synth_ai.sdk.localapi._impl.rollout_helpers import build_rollout_response
from linkedin_bench.skill_template import get_skill_content
from linkedin_bench.tasks import TASKS, get_task_by_seed
APP_ID = "linkedin_bench_env_pools"
APP_NAME = "LinkedIn Env Pools Proxy"
def provide_task_instances(seeds=None) -> list[TaskInfo]:
return [
TaskInfo(id=str(i), name=task.id, description=task.prompt)
for i, task in enumerate(TASKS)
]
def _skill_from_overrides(request: RolloutRequest) -> str:
skill = get_skill_content()
if not request.context_overrides:
return skill
for override in request.context_overrides:
if override.file_artifacts:
for path, content in override.file_artifacts.items():
if "skill" in path.lower():
return content
return skill
def _poll_rollout(rollout_id: str, backend_url: str, api_key: str, timeout_sec: float) -> dict[str, Any]:
deadline = time.time() + timeout_sec
while time.time() < deadline:
status = get_rollout(rollout_id, backend_base=backend_url, api_key=api_key, timeout=30)
if status.get("status") in ("succeeded", "failed", "cancelled", "error", "completed"):
return status
time.sleep(3)
return get_rollout(rollout_id, backend_base=backend_url, api_key=api_key, timeout=30)
async def run_rollout(request: RolloutRequest, _fastapi_request: Any) -> RolloutResponse:
backend_url = os.environ.get("ENV_POOLS_BASE_URL", "https://api.usesynth.ai")
api_key = os.environ.get("SYNTH_API_KEY", "").strip()
if not api_key:
raise ValueError("SYNTH_API_KEY is required to call environment pools")
seed = request.env.seed or 0
task = get_task_by_seed(seed)
skill_content = _skill_from_overrides(request)
rollout_request = {
"task_ref": {"dataset": "linkedin_bench", "task_id": task.id},
"agent": AgentSpec.claude_code().model_dump(exclude_none=True),
"environment": {"backend": "browser"},
"browser": {
"task_prompt": task.prompt,
"skill": skill_content,
"skill_domain": "linkedin.com",
"profile": os.environ.get("KERNEL_PROFILE", "linkedin"),
"timeout_sec": task.timeout,
"headless": True,
"capture_screenshot": True,
"verifier_model": "claude-sonnet-4-20250514",
"expected": task.expected,
},
"pool_tags": ["browser", "kernel"],
"timeouts": {"agent_sec": task.timeout},
}
created = create_rollout(
backend_base=backend_url,
api_key=api_key,
request=rollout_request,
timeout=60,
)
rollout_id = created.get("rollout_id") or created.get("trial_id")
if not rollout_id:
raise RuntimeError("Environment pools did not return a rollout_id")
final = _poll_rollout(rollout_id, backend_url, api_key, task.timeout + 120)
reward = float(final.get("reward_primary") or 0.0)
return build_rollout_response(
request=request,
outcome_reward=reward,
success_status=SuccessStatus.SUCCESS if reward > 0 else SuccessStatus.FAILURE,
status_detail=f"env pools status: {final.get('status')}",
artifact=[
Artifact(
content={
"rollout_id": rollout_id,
"status": final.get("status"),
"pool_id": final.get("pool_id"),
"reward_primary": final.get("reward_primary"),
},
content_type="environment_pool_rollout",
)
],
)
app = create_local_api(
LocalAPIConfig(
app_id=APP_ID,
name=APP_NAME,
description="Proxy LinkedIn bench rollouts into environment pools.",
provide_task_instances=provide_task_instances,
rollout=run_rollout,
cors_origins=["*"],
)
)
if __name__ == "__main__":
import uvicorn
port = int(os.environ.get("PORT", "8030"))
uvicorn.run(app, host="0.0.0.0", port=port)