Skip to main content

Event Rewards

Event rewards are step-level rewards attached to specific events within a session. They enable fine-grained credit assignment, allowing you to attribute rewards to individual LLM decisions, environment interactions, or runtime actions.

Overview

Unlike outcome rewards (which score the entire episode), event rewards provide per-step feedback:
  • Attach rewards to specific LLM decisions
  • Track achievement progress across steps
  • Enable shaped reward signals
  • Support multiple reward sources (environment, evaluator, human)

Schema

class EventReward:
    id: int                      # Primary key
    event_id: int               # FK to events.id
    session_id: str             # FK to session_traces.session_id
    message_id: int | None      # Optional FK to messages.id
    turn_number: int | None     # Optional 1-based turn index
    reward_value: float         # Numeric reward
    reward_type: str | None     # shaped | sparse | achievement | penalty | evaluator | human
    key: str | None             # e.g., achievement name
    annotation: dict | None     # Free-form JSON metadata
    source: str | None          # environment | runner | evaluator | human
    created_at: datetime        # Timestamp

Fields

event_id (required)
  • Foreign key to events.id
  • Links this reward to a specific event (LLM call, env step, runtime action)
reward_value (required)
  • Numeric reward value
  • Can be positive (reward) or negative (penalty)
  • No fixed scale; interpretation depends on reward_type
reward_type (optional)
  • "shaped": Dense reward signal for intermediate progress
  • "sparse": Reward only at key milestones
  • "achievement": Binary indicator for unlocking an achievement
  • "achievement_delta": Count of achievements unlocked
  • "unique_achievement_delta": Count of new achievements this episode
  • "penalty": Negative reward for undesirable behavior
  • "evaluator": Reward from automated judge/rubric
  • "human": Human-annotated reward
key (optional)
  • String identifier for specific reward criteria
  • Examples: "collected_wood", "solved_task", "tool_use_success"
source (optional)
  • "environment": Emitted by task environment
  • "runner": Computed by rollout executor
  • "evaluator": From automated judge/LLM evaluator
  • "human": Human annotation
annotation (optional)
  • Free-form JSON for additional context
  • Examples: {"delta": 3, "prev_count": 2, "curr_count": 5}
turn_number (optional)
  • 1-based turn index for conversational contexts
  • Useful for aligning rewards with dialogue turns

Recording Event Rewards

From Task Apps

Task apps can record event rewards during rollout execution:
from synth_ai.tracing_v3 import SessionTracer

# Create tracer
tracer = SessionTracer(db_path="traces.db", session_id="episode_001")

# Record LM event
lm_event = LMCAISEvent(
    system_instance_id="agent",
    time_record=TimeRecord(event_time=time.time()),
    model_name="gpt-4",
    call_records=[...],
)
event_id = tracer.record_event(lm_event)

# Record event reward for this decision
tracer.record_event_reward(
    event_id=event_id,
    reward_value=0.85,
    reward_type="achievement_delta",
    key="collected_wood",
    annotation={"prev_achievements": 2, "curr_achievements": 3},
    source="environment",
)

Automatic Recording from Environment Events

EnvironmentEvent instances with non-zero reward automatically create a sparse event reward:
# Record environment step
env_event = EnvironmentEvent(
    system_instance_id="crafter",
    time_record=TimeRecord(event_time=time.time()),
    reward=1.0,  # Non-zero reward
    system_state_after={...},
)
env_event_id = tracer.record_event(env_event)

# Automatically creates:
# EventReward(
#     event_id=env_event_id,
#     reward_value=1.0,
#     reward_type="sparse",
#     source="environment",
# )

Reward Types in Detail

Achievement Delta

Count of achievements that became true during this step.
# Before step: player has 2 achievements
# After step: player has 5 achievements
# Achievement delta = 3

tracer.record_event_reward(
    event_id=lm_event_id,
    reward_value=3.0,
    reward_type="achievement_delta",
    annotation={
        "prev_count": 2,
        "curr_count": 5,
        "unlocked": ["collect_wood", "craft_pickaxe", "defeat_zombie"],
    },
    source="environment",
)

Unique Achievement Delta

Count of new achievements this episode (first time unlocked).
# Episode history: [A, B] already unlocked
# This step unlocks: [B, C, D]
# Unique delta = 2 (C and D are new, B was already unlocked)

tracer.record_event_reward(
    event_id=lm_event_id,
    reward_value=2.0,
    reward_type="unique_achievement_delta",
    annotation={
        "episode_history": ["A", "B"],
        "step_unlocked": ["B", "C", "D"],
        "unique_this_step": ["C", "D"],
    },
    source="runner",
)

Per-Achievement Rewards

One reward per achievement unlocked:
# Record one reward for each achievement
for achievement in ["collect_wood", "craft_pickaxe"]:
    tracer.record_event_reward(
        event_id=lm_event_id,
        reward_value=1.0,
        reward_type="achievement",
        key=achievement,
        source="environment",
    )

Shaped Rewards

Dense rewards for incremental progress:
# Reward proportional to distance to goal
progress_ratio = current_distance / initial_distance
reward = (1.0 - progress_ratio) * 0.1  # Small reward for progress

tracer.record_event_reward(
    event_id=event_id,
    reward_value=reward,
    reward_type="shaped",
    annotation={"distance": current_distance},
    source="environment",
)

Evaluator Rewards

Rewards from automated judges/rubrics:
# After judging an event
tracer.record_event_reward(
    event_id=lm_event_id,
    reward_value=score,
    reward_type="evaluator",
    key="tool_use_quality",
    annotation={
        "rubric_criteria": {...},
        "judge_model": "gpt-4",
    },
    source="evaluator",
)

Querying Event Rewards

Get All Event Rewards for a Session

import sqlite3

conn = sqlite3.connect("traces.db")
cursor = conn.cursor()

cursor.execute(
    """
    SELECT 
        er.event_id,
        er.reward_value,
        er.reward_type,
        er.key,
        er.turn_number,
        e.event_type
    FROM event_rewards er
    JOIN events e ON er.event_id = e.id
    WHERE er.session_id = ?
    ORDER BY e.event_time
    """,
    ("episode_001",)
)

for row in cursor.fetchall():
    print(f"Event {row[0]}: {row[1]} ({row[2]}) - {row[3]}")

Get Event Rewards by Type

cursor.execute(
    """
    SELECT event_id, reward_value, key
    FROM event_rewards
    WHERE session_id = ? AND reward_type = ?
    ORDER BY created_at
    """,
    ("episode_001", "achievement")
)

Sum Event Rewards for RL Training

def get_step_rewards(session_id: str, conn: sqlite3.Connection) -> list[float]:
    """Get per-step rewards for RL training."""
    cursor = conn.cursor()
    
    # Get turn count
    cursor.execute(
        "SELECT COUNT(DISTINCT turn_number) FROM events WHERE session_id = ?",
        (session_id,)
    )
    num_turns = cursor.fetchone()[0]
    
    # Initialize reward vector
    rewards = [0.0] * num_turns
    
    # Sum event rewards by turn
    cursor.execute(
        """
        SELECT 
            e.turn_number,
            SUM(er.reward_value) as turn_reward
        FROM event_rewards er
        JOIN events e ON er.event_id = e.id
        WHERE er.session_id = ?
        GROUP BY e.turn_number
        ORDER BY e.turn_number
        """,
        (session_id,)
    )
    
    for turn, reward in cursor.fetchall():
        if turn is not None and 0 <= turn - 1 < num_turns:
            rewards[turn - 1] = reward
    
    return rewards

Use Cases

1. RL with Achievement-Based Rewards

# In task app rollout executor
async def rollout_executor(request: RolloutRequest) -> RolloutResponse:
    tracer = SessionTracer(session_id=run_id)
    
    prev_achievements = set()
    
    for turn in range(max_turns):
        # Get LLM decision
        lm_event = await get_llm_decision(...)
        event_id = tracer.record_event(lm_event)
        
        # Execute actions
        env_response = await execute_actions(...)
        
        # Compute achievement delta
        curr_achievements = set(env_response.achievements)
        new_achievements = curr_achievements - prev_achievements
        achievement_delta = len(curr_achievements) - len(prev_achievements)
        unique_delta = len(new_achievements)
        
        # Record both deltas
        tracer.record_event_reward(
            event_id=event_id,
            reward_value=float(achievement_delta),
            reward_type="achievement_delta",
            source="environment",
        )
        
        tracer.record_event_reward(
            event_id=event_id,
            reward_value=float(unique_delta),
            reward_type="unique_achievement_delta",
            annotation={"new": list(new_achievements)},
            source="runner",
        )
        
        prev_achievements = curr_achievements
    
    return response

2. Filtering High-Quality Steps for SFT

# Query high-reward steps
cursor.execute(
    """
    SELECT 
        e.id,
        e.event_type,
        e.event_data,
        er.reward_value
    FROM events e
    JOIN event_rewards er ON e.id = er.event_id
    WHERE 
        er.session_id = ?
        AND er.reward_type = 'unique_achievement_delta'
        AND er.reward_value > 0
    ORDER BY er.reward_value DESC
    """,
    (session_id,)
)

# Export LLM calls with positive rewards for cloning
high_quality_steps = []
for event_id, event_type, event_data, reward in cursor.fetchall():
    if event_type == "cais":
        high_quality_steps.append({
            "messages": event_data["call_records"][0]["request_messages"],
            "response": event_data["call_records"][0]["response_message"],
            "reward": reward,
        })

3. Combining Event and Outcome Rewards

# Weighted combination
def compute_combined_reward(session_id: str, weights: dict) -> float:
    """
    Combine event rewards and outcome reward with weights.
    
    Args:
        session_id: Session to score
        weights: {"event": 0.3, "outcome": 0.7}
    """
    # Sum all event rewards
    cursor.execute(
        "SELECT SUM(reward_value) FROM event_rewards WHERE session_id = ?",
        (session_id,)
    )
    event_sum = cursor.fetchone()[0] or 0.0
    
    # Get outcome reward
    cursor.execute(
        "SELECT total_reward FROM outcome_rewards WHERE session_id = ?",
        (session_id,)
    )
    outcome = cursor.fetchone()[0] or 0.0
    
    # Weighted combination
    combined = (
        weights["event"] * event_sum +
        weights["outcome"] * outcome
    )
    
    return combined

Best Practices

1. Be Consistent with Reward Types

# Good: Use specific types for specific purposes
tracer.record_event_reward(..., reward_type="achievement_delta")
tracer.record_event_reward(..., reward_type="unique_achievement_delta")
tracer.record_event_reward(..., reward_type="evaluator")

# Bad: Use generic types for everything
tracer.record_event_reward(..., reward_type="shaped")  # Too vague

2. Provide Context in Annotations

# Good: Rich annotation
tracer.record_event_reward(
    event_id=event_id,
    reward_value=3.0,
    reward_type="achievement_delta",
    annotation={
        "prev_count": 2,
        "curr_count": 5,
        "unlocked": ["wood", "pickaxe", "zombie"],
        "timestamp": time.time(),
    },
    source="environment",
)

# Bad: No annotation
tracer.record_event_reward(
    event_id=event_id,
    reward_value=3.0,
)

3. Use Keys for Specific Achievements

# Good: One reward per achievement with key
for achievement in new_achievements:
    tracer.record_event_reward(
        event_id=event_id,
        reward_value=1.0,
        reward_type="achievement",
        key=achievement,  # Specific achievement name
        source="environment",
    )

# Bad: Lump sum without key
tracer.record_event_reward(
    event_id=event_id,
    reward_value=len(new_achievements),
    reward_type="achievement",
    # No key - can't tell which achievements
)

4. Specify Source

# Always specify where the reward came from
tracer.record_event_reward(
    ...,
    source="environment",  # From env
)

tracer.record_event_reward(
    ...,
    source="evaluator",  # From judge
)

tracer.record_event_reward(
    ...,
    source="runner",  # Computed by rollout executor
)

See Also