Skip to main content

synth_ai.sdk.optimization.graph.job

Graph optimization job implementation. This module provides the canonical GraphOptimizationJob class for running graph/workflow optimization jobs. Replaces: GraphOptimizationJob (from sdk/api/train), GraphEvolveJob (deprecated) Backend endpoint: /api/graph-evolve/jobs, /api/graph_evolve/jobs Algorithms:
  • graph_gepa: Graph Genetic Evolutionary Prompt Algorithm (default, currently only)
Two construction modes:
  • from_config(): TOML config file (like old GraphOptimizationJob)
  • from_dataset(): JSON dataset (like GraphEvolveJob)

Classes

Algorithm

Supported graph optimization algorithms. Methods:

from_string

from_string(cls, value: str) -> Algorithm
Convert string to Algorithm enum.

GraphOptimizationJobConfig

Configuration for a graph optimization job. This dataclass holds all the configuration needed to submit and run a graph optimization job. Supports two modes:
  1. Config-based: Provide config_path pointing to a TOML file
  2. Dataset-based: Internal use via from_dataset() constructor
Attributes:
  • backend_url: Base URL of the Synth API backend.
  • api_key: Synth API key for authentication.
  • config_path: Path to the TOML configuration file (config mode).
  • config_dict: Dictionary configuration (programmatic mode).
  • algorithm: Optimization algorithm to use.

GraphOptimizationJob

High-level SDK class for running graph optimization jobs. This is the canonical class for graph optimization, unifying the old GraphOptimizationJob (config-based) and GraphEvolveJob (dataset-based) under a single API. Two construction modes:
  • from_config(): Create from TOML config file
  • from_dataset(): Create from JSON dataset (auto-generates task app)
Example (config-based):
>>> from synth_ai.sdk.optimization.graph import GraphOptimizationJob
>>>
>>> job = GraphOptimizationJob.from_config("my_config.toml")
>>> job.submit()
>>> result = job.stream_until_complete()
>>> print(f"Best score: {result.best_score}")
Example (dataset-based):
>>> job = GraphOptimizationJob.from_dataset(
...     "tasks.json",
...     policy_models="gpt-4o-mini",
...     rollout_budget=100,
... )
>>> job.submit()
>>> result = job.stream_until_complete()
>>> print(f"Best score: {result.best_score}")
Attributes:
  • job_id: The job ID (None until submitted)
  • algorithm: The optimization algorithm being used
Methods:

from_config

from_config(cls, config_path: str | Path) -> GraphOptimizationJob
Create a job from a TOML config file. Args:
  • config_path: Path to TOML config file
  • backend_url: Backend API URL (defaults to env or production)
  • api_key: API key (defaults to SYNTH_API_KEY env var)
Returns:
  • GraphOptimizationJob instance
Raises:
  • ValueError: If required config is missing
  • FileNotFoundError: If config file doesn’t exist

from_dict

from_dict(cls, config_dict: Dict[str, Any]) -> GraphOptimizationJob
Create a job from a configuration dictionary. Args:
  • config_dict: Configuration dictionary
  • backend_url: Backend API URL (defaults to env or production)
  • api_key: API key (defaults to SYNTH_API_KEY env var)
Returns:
  • GraphOptimizationJob instance

from_dataset

from_dataset(cls, dataset: Any) -> GraphOptimizationJob
Create a job from a JSON dataset (GraphEvolve style). This mode auto-generates task apps from the dataset - no user-managed task apps required. Args:
  • dataset: Dataset as file path, dict, or GraphEvolveTaskSet object
  • policy_models: Model(s) to use for policy inference
  • rollout_budget: Total number of rollouts for optimization
  • proposer_effort: Proposer effort level (low, medium, high)
  • judge_model: Override judge model from dataset
  • judge_provider: Override judge provider from dataset
  • population_size: Population size for GEPA
  • num_generations: Number of generations (auto-calculated if not specified)
  • problem_spec: Detailed problem specification for the graph proposer
  • target_llm_calls: Target number of LLM calls for the graph (1-10)
  • graph_type: Type of graph to train (“policy”, “verifier”, or “rlm”)
  • initial_graph_id: Preset graph ID to optimize (required)
  • backend_url: Backend API URL (defaults to env or production)
  • api_key: API key (defaults to SYNTH_API_KEY env var)
  • auto_start: Whether to start the job immediately
  • metadata: Additional metadata for the job
Returns:
  • GraphOptimizationJob instance

from_job_id

from_job_id(cls, job_id: str, backend_url: Optional[str] = None, api_key: Optional[str] = None) -> GraphOptimizationJob
Resume an existing job by ID. Args:
  • job_id: Existing job ID
  • backend_url: Backend API URL (defaults to env or production)
  • api_key: API key (defaults to SYNTH_API_KEY env var)
Returns:
  • GraphOptimizationJob instance for the existing job

job_id

job_id(self) -> Optional[str]
Get the job ID (None if not yet submitted).

algorithm

algorithm(self) -> Algorithm
Get the optimization algorithm.

submit

submit(self) -> str
Submit the job to the backend. Returns:
  • Job ID
Raises:
  • RuntimeError: If job submission fails

get_status

get_status(self) -> Dict[str, Any]
Get current job status. Returns:
  • Job status dictionary
Raises:
  • RuntimeError: If job hasn’t been submitted yet

poll_until_complete

poll_until_complete(self) -> GraphOptimizationResult
Poll job until it reaches a terminal state. Args:
  • timeout: Maximum seconds to wait for job completion
  • interval: Seconds between poll attempts
  • progress: If True, print status updates during polling
Returns:
  • GraphOptimizationResult with typed status, best_score, etc.
Raises:
  • RuntimeError: If job hasn’t been submitted yet

stream_until_complete

stream_until_complete(self) -> GraphOptimizationResult
Stream job events until completion using SSE. Args:
  • timeout: Maximum seconds to wait
  • interval: Seconds between status checks (for SSE reconnects)
  • handlers: Optional StreamHandler instances for custom event handling
  • on_event: Optional callback called on each event
Returns:
  • GraphOptimizationResult with typed status, best_score, etc.
Raises:
  • RuntimeError: If job hasn’t been submitted yet

cancel

cancel(self) -> Dict[str, Any]
Cancel a running job. Args:
  • reason: Optional reason for cancellation
Returns:
  • Dict with cancellation status
Raises:
  • RuntimeError: If job hasn’t been submitted yet

download_graph_txt

download_graph_txt(self) -> str
Download a PUBLIC (redacted) graph export for a completed job. Returns:
  • Graph export text
Raises:
  • RuntimeError: If job hasn’t been submitted or delegate doesn’t support this

run_inference

run_inference(self, input_data: Dict[str, Any]) -> Dict[str, Any]
Run inference with the optimized graph/workflow. Args:
  • input_data: Input data matching the task format
  • model: Override model (default: use job’s policy model)
  • graph_snapshot_id: Specific GraphSnapshot to use (default: best)
Returns:
  • Output dictionary
Raises:
  • RuntimeError: If job hasn’t been submitted or delegate doesn’t support this