kiln_ai.adapters.eval.eval_runner

  1import asyncio
  2import logging
  3from dataclasses import dataclass
  4from typing import AsyncGenerator, Dict, List, Literal, Set
  5
  6from kiln_ai.adapters.eval.base_eval import BaseEval
  7from kiln_ai.adapters.eval.registry import eval_adapter_from_type
  8from kiln_ai.datamodel.basemodel import ID_TYPE
  9from kiln_ai.datamodel.dataset_filters import dataset_filter_from_id
 10from kiln_ai.datamodel.eval import EvalConfig, EvalRun, EvalScores
 11from kiln_ai.datamodel.task import TaskRunConfig
 12from kiln_ai.datamodel.task_run import TaskRun
 13
 14logger = logging.getLogger(__name__)
 15
 16
 17@dataclass
 18class EvalJob:
 19    item: TaskRun
 20    type: Literal["task_run_eval", "eval_config_eval"]
 21    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
 22    eval_config: EvalConfig
 23    task_run_config: TaskRunConfig | None = None
 24
 25
 26@dataclass
 27class EvalProgress:
 28    complete: int | None = None
 29    total: int | None = None
 30    errors: int | None = None
 31
 32
 33class EvalRunner:
 34    """
 35    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 36
 37    Can run an eval in 2 modes:
 38    1) eval_config_eval: evaluate an eval config using existing dataset items.
 39    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 40    """
 41
 42    def __init__(
 43        self,
 44        eval_configs: List[EvalConfig],
 45        run_configs: List[TaskRunConfig] | None,
 46        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 47    ):
 48        if len(eval_configs) == 0:
 49            raise ValueError("Eval runner requires at least one eval config")
 50        target_eval = eval_configs[0].parent_eval()
 51        if target_eval is None:
 52            raise ValueError("Eval config requires a parent eval")
 53        for eval_config in eval_configs:
 54            parent_eval = eval_config.parent_eval()
 55            if parent_eval is None:
 56                raise ValueError("Eval config requires a parent eval")
 57            if parent_eval.id != target_eval.id:
 58                raise ValueError("All eval configs must have the same parent eval")
 59
 60        target_task = target_eval.parent_task()
 61        if target_task is None:
 62            raise ValueError("Eval config requires a (grand)parent task")
 63
 64        # Check that run_configs is compatible
 65        if eval_run_type == "task_run_eval":
 66            if run_configs is None or len(run_configs) == 0:
 67                raise ValueError("Task run eval requires run configs")
 68            for run_config in run_configs:
 69                parent_task = run_config.parent_task()
 70                if parent_task is None:
 71                    raise ValueError("All run configs must have a parent task")
 72                if parent_task.id != target_task.id:
 73                    raise ValueError(
 74                        "Run config is not for the same task as the eval configs"
 75                    )
 76        else:
 77            if run_configs is not None:
 78                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 79
 80        self.eval_run_type = eval_run_type
 81        self.eval_configs = eval_configs
 82        self.run_configs = run_configs
 83        self.task = target_task
 84        self.eval = target_eval
 85
 86    def collect_tasks(self) -> List[EvalJob]:
 87        if self.eval_run_type == "eval_config_eval":
 88            return self.collect_tasks_for_eval_config_eval()
 89        else:
 90            return self.collect_tasks_for_task_run_eval()
 91
 92    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 93        """
 94        Collect all jobs for this run, excluding any that have already been run.
 95
 96        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 97
 98        The tasks:
 99        - should be in the eval config set filter
100        - should not have already been run for this eval config + dataset item pair
101        """
102        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
103
104        # already_run[eval_config_id][dataset_id]
105        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
106        for eval_config in self.eval_configs:
107            already_run[eval_config.id] = set()
108            for run in eval_config.runs(readonly=True):
109                already_run[eval_config.id].add(run.dataset_id)
110
111        return [
112            EvalJob(
113                item=task_run,
114                eval_config=eval_config,
115                type="eval_config_eval",
116            )
117            for task_run in self.task.runs(readonly=True)
118            if filter(task_run)
119            for eval_config in self.eval_configs
120            if task_run.id not in already_run[eval_config.id]
121        ]
122
123    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
124        """
125        Collect all jobs for this run, excluding any that have already been run.
126
127        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
128
129        The tasks:
130        - should be in the eval set filter
131        - should not have already been run for this eval config + run config + dataset item
132        """
133        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
134
135        # already_run[eval_config_id][run_config_id][dataset_id]
136        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
137        for eval_config in self.eval_configs:
138            already_run[eval_config.id] = {}
139            for run_config in self.run_configs or []:
140                already_run[eval_config.id][run_config.id] = set()
141            for run in eval_config.runs(readonly=True):
142                if (
143                    run.task_run_config_id is not None
144                    and run.task_run_config_id in already_run[eval_config.id]
145                ):
146                    already_run[eval_config.id][run.task_run_config_id].add(
147                        run.dataset_id
148                    )
149
150        return [
151            EvalJob(
152                item=task_run,
153                task_run_config=run_config,
154                type="task_run_eval",
155                eval_config=eval_config,
156            )
157            for task_run in self.task.runs(readonly=True)
158            if filter(task_run)
159            for eval_config in self.eval_configs
160            for run_config in self.run_configs or []
161            if task_run.id not in already_run[eval_config.id][run_config.id]
162        ]
163
164    async def run(self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, None]:
165        """
166        Runs the configured eval run with parallel workers and yields progress updates.
167        """
168        jobs = self.collect_tasks()
169
170        complete = 0
171        errors = 0
172        total = len(jobs)
173
174        # Send initial status
175        yield EvalProgress(complete=complete, total=total, errors=errors)
176
177        worker_queue: asyncio.Queue[EvalJob] = asyncio.Queue()
178        for job in jobs:
179            worker_queue.put_nowait(job)
180
181        # simple status queue to return progress. True=success, False=error
182        status_queue: asyncio.Queue[bool] = asyncio.Queue()
183
184        workers = []
185        for i in range(concurrency):
186            task = asyncio.create_task(self.run_worker(worker_queue, status_queue))
187            workers.append(task)
188
189        # Send status updates until workers are done, and they are all sent
190        while not status_queue.empty() or not all(worker.done() for worker in workers):
191            try:
192                # Use timeout to prevent hanging if all workers complete
193                # between our while condition check and get()
194                success = await asyncio.wait_for(status_queue.get(), timeout=0.1)
195                if success:
196                    complete += 1
197                else:
198                    errors += 1
199
200                yield EvalProgress(complete=complete, total=total, errors=errors)
201            except asyncio.TimeoutError:
202                # Timeout is expected, just continue to recheck worker status
203                # Don't love this but beats sentinels for reliability
204                continue
205
206        # These are redundant, but keeping them will catch async errors
207        await asyncio.gather(*workers)
208        await worker_queue.join()
209
210    async def run_worker(
211        self, worker_queue: asyncio.Queue[EvalJob], status_queue: asyncio.Queue[bool]
212    ):
213        while True:
214            try:
215                job = worker_queue.get_nowait()
216            except asyncio.QueueEmpty:
217                # worker can end when the queue is empty
218                break
219            try:
220                success = await self.run_job(job)
221                await status_queue.put(success)
222            finally:
223                # Always mark the dequeued task as done, even on exceptions
224                worker_queue.task_done()
225
226    async def run_job(self, job: EvalJob) -> bool:
227        try:
228            # Create the evaluator for this eval config/run config pair
229            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
230                job.eval_config,
231                job.task_run_config.run_config() if job.task_run_config else None,
232            )
233            if not isinstance(evaluator, BaseEval):
234                raise ValueError("Not able to create evaluator from eval config")
235
236            task_output: str | None = None
237            scores: EvalScores | None = None
238            intermediate_outputs: Dict[str, str] | None = None
239            if job.type == "eval_config_eval":
240                # Eval config eval, we use the saved input from the task run, not invoking the task again
241                scores, intermediate_outputs = await evaluator.run_eval(job.item)
242                task_output = job.item.output.output
243            else:
244                # Task run eval, we invoke the task again to get a fresh output
245                (
246                    result_task_run,
247                    scores,
248                    intermediate_outputs,
249                ) = await evaluator.run_task_and_eval(job.item.input)
250                task_output = result_task_run.output.output
251
252            # Save the job result
253            eval_run = EvalRun(
254                parent=job.eval_config,
255                task_run_config_id=job.task_run_config.id
256                if job.task_run_config
257                else None,
258                dataset_id=job.item.id,
259                eval_config_eval=job.type == "eval_config_eval",
260                scores=scores,
261                input=job.item.input,
262                output=task_output,
263                intermediate_outputs=intermediate_outputs,
264            )
265            eval_run.save_to_file()
266
267            return True
268        except Exception as e:
269            logger.error(f"Error running eval job for dataset item {job.item.id}: {e}")
270            return False
logger = <Logger kiln_ai.adapters.eval.eval_runner (WARNING)>
@dataclass
class EvalJob:
18@dataclass
19class EvalJob:
20    item: TaskRun
21    type: Literal["task_run_eval", "eval_config_eval"]
22    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
23    eval_config: EvalConfig
24    task_run_config: TaskRunConfig | None = None
EvalJob( item: kiln_ai.datamodel.TaskRun, type: Literal['task_run_eval', 'eval_config_eval'], eval_config: kiln_ai.datamodel.eval.EvalConfig, task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None)
type: Literal['task_run_eval', 'eval_config_eval']
task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None
@dataclass
class EvalProgress:
27@dataclass
28class EvalProgress:
29    complete: int | None = None
30    total: int | None = None
31    errors: int | None = None
EvalProgress( complete: int | None = None, total: int | None = None, errors: int | None = None)
complete: int | None = None
total: int | None = None
errors: int | None = None
class EvalRunner:
 34class EvalRunner:
 35    """
 36    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 37
 38    Can run an eval in 2 modes:
 39    1) eval_config_eval: evaluate an eval config using existing dataset items.
 40    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 41    """
 42
 43    def __init__(
 44        self,
 45        eval_configs: List[EvalConfig],
 46        run_configs: List[TaskRunConfig] | None,
 47        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 48    ):
 49        if len(eval_configs) == 0:
 50            raise ValueError("Eval runner requires at least one eval config")
 51        target_eval = eval_configs[0].parent_eval()
 52        if target_eval is None:
 53            raise ValueError("Eval config requires a parent eval")
 54        for eval_config in eval_configs:
 55            parent_eval = eval_config.parent_eval()
 56            if parent_eval is None:
 57                raise ValueError("Eval config requires a parent eval")
 58            if parent_eval.id != target_eval.id:
 59                raise ValueError("All eval configs must have the same parent eval")
 60
 61        target_task = target_eval.parent_task()
 62        if target_task is None:
 63            raise ValueError("Eval config requires a (grand)parent task")
 64
 65        # Check that run_configs is compatible
 66        if eval_run_type == "task_run_eval":
 67            if run_configs is None or len(run_configs) == 0:
 68                raise ValueError("Task run eval requires run configs")
 69            for run_config in run_configs:
 70                parent_task = run_config.parent_task()
 71                if parent_task is None:
 72                    raise ValueError("All run configs must have a parent task")
 73                if parent_task.id != target_task.id:
 74                    raise ValueError(
 75                        "Run config is not for the same task as the eval configs"
 76                    )
 77        else:
 78            if run_configs is not None:
 79                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 80
 81        self.eval_run_type = eval_run_type
 82        self.eval_configs = eval_configs
 83        self.run_configs = run_configs
 84        self.task = target_task
 85        self.eval = target_eval
 86
 87    def collect_tasks(self) -> List[EvalJob]:
 88        if self.eval_run_type == "eval_config_eval":
 89            return self.collect_tasks_for_eval_config_eval()
 90        else:
 91            return self.collect_tasks_for_task_run_eval()
 92
 93    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 94        """
 95        Collect all jobs for this run, excluding any that have already been run.
 96
 97        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 98
 99        The tasks:
100        - should be in the eval config set filter
101        - should not have already been run for this eval config + dataset item pair
102        """
103        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
104
105        # already_run[eval_config_id][dataset_id]
106        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
107        for eval_config in self.eval_configs:
108            already_run[eval_config.id] = set()
109            for run in eval_config.runs(readonly=True):
110                already_run[eval_config.id].add(run.dataset_id)
111
112        return [
113            EvalJob(
114                item=task_run,
115                eval_config=eval_config,
116                type="eval_config_eval",
117            )
118            for task_run in self.task.runs(readonly=True)
119            if filter(task_run)
120            for eval_config in self.eval_configs
121            if task_run.id not in already_run[eval_config.id]
122        ]
123
124    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
125        """
126        Collect all jobs for this run, excluding any that have already been run.
127
128        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
129
130        The tasks:
131        - should be in the eval set filter
132        - should not have already been run for this eval config + run config + dataset item
133        """
134        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
135
136        # already_run[eval_config_id][run_config_id][dataset_id]
137        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
138        for eval_config in self.eval_configs:
139            already_run[eval_config.id] = {}
140            for run_config in self.run_configs or []:
141                already_run[eval_config.id][run_config.id] = set()
142            for run in eval_config.runs(readonly=True):
143                if (
144                    run.task_run_config_id is not None
145                    and run.task_run_config_id in already_run[eval_config.id]
146                ):
147                    already_run[eval_config.id][run.task_run_config_id].add(
148                        run.dataset_id
149                    )
150
151        return [
152            EvalJob(
153                item=task_run,
154                task_run_config=run_config,
155                type="task_run_eval",
156                eval_config=eval_config,
157            )
158            for task_run in self.task.runs(readonly=True)
159            if filter(task_run)
160            for eval_config in self.eval_configs
161            for run_config in self.run_configs or []
162            if task_run.id not in already_run[eval_config.id][run_config.id]
163        ]
164
165    async def run(self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, None]:
166        """
167        Runs the configured eval run with parallel workers and yields progress updates.
168        """
169        jobs = self.collect_tasks()
170
171        complete = 0
172        errors = 0
173        total = len(jobs)
174
175        # Send initial status
176        yield EvalProgress(complete=complete, total=total, errors=errors)
177
178        worker_queue: asyncio.Queue[EvalJob] = asyncio.Queue()
179        for job in jobs:
180            worker_queue.put_nowait(job)
181
182        # simple status queue to return progress. True=success, False=error
183        status_queue: asyncio.Queue[bool] = asyncio.Queue()
184
185        workers = []
186        for i in range(concurrency):
187            task = asyncio.create_task(self.run_worker(worker_queue, status_queue))
188            workers.append(task)
189
190        # Send status updates until workers are done, and they are all sent
191        while not status_queue.empty() or not all(worker.done() for worker in workers):
192            try:
193                # Use timeout to prevent hanging if all workers complete
194                # between our while condition check and get()
195                success = await asyncio.wait_for(status_queue.get(), timeout=0.1)
196                if success:
197                    complete += 1
198                else:
199                    errors += 1
200
201                yield EvalProgress(complete=complete, total=total, errors=errors)
202            except asyncio.TimeoutError:
203                # Timeout is expected, just continue to recheck worker status
204                # Don't love this but beats sentinels for reliability
205                continue
206
207        # These are redundant, but keeping them will catch async errors
208        await asyncio.gather(*workers)
209        await worker_queue.join()
210
211    async def run_worker(
212        self, worker_queue: asyncio.Queue[EvalJob], status_queue: asyncio.Queue[bool]
213    ):
214        while True:
215            try:
216                job = worker_queue.get_nowait()
217            except asyncio.QueueEmpty:
218                # worker can end when the queue is empty
219                break
220            try:
221                success = await self.run_job(job)
222                await status_queue.put(success)
223            finally:
224                # Always mark the dequeued task as done, even on exceptions
225                worker_queue.task_done()
226
227    async def run_job(self, job: EvalJob) -> bool:
228        try:
229            # Create the evaluator for this eval config/run config pair
230            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
231                job.eval_config,
232                job.task_run_config.run_config() if job.task_run_config else None,
233            )
234            if not isinstance(evaluator, BaseEval):
235                raise ValueError("Not able to create evaluator from eval config")
236
237            task_output: str | None = None
238            scores: EvalScores | None = None
239            intermediate_outputs: Dict[str, str] | None = None
240            if job.type == "eval_config_eval":
241                # Eval config eval, we use the saved input from the task run, not invoking the task again
242                scores, intermediate_outputs = await evaluator.run_eval(job.item)
243                task_output = job.item.output.output
244            else:
245                # Task run eval, we invoke the task again to get a fresh output
246                (
247                    result_task_run,
248                    scores,
249                    intermediate_outputs,
250                ) = await evaluator.run_task_and_eval(job.item.input)
251                task_output = result_task_run.output.output
252
253            # Save the job result
254            eval_run = EvalRun(
255                parent=job.eval_config,
256                task_run_config_id=job.task_run_config.id
257                if job.task_run_config
258                else None,
259                dataset_id=job.item.id,
260                eval_config_eval=job.type == "eval_config_eval",
261                scores=scores,
262                input=job.item.input,
263                output=task_output,
264                intermediate_outputs=intermediate_outputs,
265            )
266            eval_run.save_to_file()
267
268            return True
269        except Exception as e:
270            logger.error(f"Error running eval job for dataset item {job.item.id}: {e}")
271            return False

Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.

Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.

EvalRunner( eval_configs: List[kiln_ai.datamodel.eval.EvalConfig], run_configs: Optional[List[kiln_ai.datamodel.task.TaskRunConfig]], eval_run_type: Literal['eval_config_eval', 'task_run_eval'])
43    def __init__(
44        self,
45        eval_configs: List[EvalConfig],
46        run_configs: List[TaskRunConfig] | None,
47        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
48    ):
49        if len(eval_configs) == 0:
50            raise ValueError("Eval runner requires at least one eval config")
51        target_eval = eval_configs[0].parent_eval()
52        if target_eval is None:
53            raise ValueError("Eval config requires a parent eval")
54        for eval_config in eval_configs:
55            parent_eval = eval_config.parent_eval()
56            if parent_eval is None:
57                raise ValueError("Eval config requires a parent eval")
58            if parent_eval.id != target_eval.id:
59                raise ValueError("All eval configs must have the same parent eval")
60
61        target_task = target_eval.parent_task()
62        if target_task is None:
63            raise ValueError("Eval config requires a (grand)parent task")
64
65        # Check that run_configs is compatible
66        if eval_run_type == "task_run_eval":
67            if run_configs is None or len(run_configs) == 0:
68                raise ValueError("Task run eval requires run configs")
69            for run_config in run_configs:
70                parent_task = run_config.parent_task()
71                if parent_task is None:
72                    raise ValueError("All run configs must have a parent task")
73                if parent_task.id != target_task.id:
74                    raise ValueError(
75                        "Run config is not for the same task as the eval configs"
76                    )
77        else:
78            if run_configs is not None:
79                raise ValueError("Mode 'eval_config_eval' does not support run configs")
80
81        self.eval_run_type = eval_run_type
82        self.eval_configs = eval_configs
83        self.run_configs = run_configs
84        self.task = target_task
85        self.eval = target_eval
eval_run_type
eval_configs
run_configs
task
eval
def collect_tasks(self) -> List[EvalJob]:
87    def collect_tasks(self) -> List[EvalJob]:
88        if self.eval_run_type == "eval_config_eval":
89            return self.collect_tasks_for_eval_config_eval()
90        else:
91            return self.collect_tasks_for_task_run_eval()
def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 93    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 94        """
 95        Collect all jobs for this run, excluding any that have already been run.
 96
 97        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 98
 99        The tasks:
100        - should be in the eval config set filter
101        - should not have already been run for this eval config + dataset item pair
102        """
103        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
104
105        # already_run[eval_config_id][dataset_id]
106        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
107        for eval_config in self.eval_configs:
108            already_run[eval_config.id] = set()
109            for run in eval_config.runs(readonly=True):
110                already_run[eval_config.id].add(run.dataset_id)
111
112        return [
113            EvalJob(
114                item=task_run,
115                eval_config=eval_config,
116                type="eval_config_eval",
117            )
118            for task_run in self.task.runs(readonly=True)
119            if filter(task_run)
120            for eval_config in self.eval_configs
121            if task_run.id not in already_run[eval_config.id]
122        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).

The tasks:

  • should be in the eval config set filter
  • should not have already been run for this eval config + dataset item pair
def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
124    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
125        """
126        Collect all jobs for this run, excluding any that have already been run.
127
128        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
129
130        The tasks:
131        - should be in the eval set filter
132        - should not have already been run for this eval config + run config + dataset item
133        """
134        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
135
136        # already_run[eval_config_id][run_config_id][dataset_id]
137        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
138        for eval_config in self.eval_configs:
139            already_run[eval_config.id] = {}
140            for run_config in self.run_configs or []:
141                already_run[eval_config.id][run_config.id] = set()
142            for run in eval_config.runs(readonly=True):
143                if (
144                    run.task_run_config_id is not None
145                    and run.task_run_config_id in already_run[eval_config.id]
146                ):
147                    already_run[eval_config.id][run.task_run_config_id].add(
148                        run.dataset_id
149                    )
150
151        return [
152            EvalJob(
153                item=task_run,
154                task_run_config=run_config,
155                type="task_run_eval",
156                eval_config=eval_config,
157            )
158            for task_run in self.task.runs(readonly=True)
159            if filter(task_run)
160            for eval_config in self.eval_configs
161            for run_config in self.run_configs or []
162            if task_run.id not in already_run[eval_config.id][run_config.id]
163        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.

The tasks:

  • should be in the eval set filter
  • should not have already been run for this eval config + run config + dataset item
async def run( self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, NoneType]:
165    async def run(self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, None]:
166        """
167        Runs the configured eval run with parallel workers and yields progress updates.
168        """
169        jobs = self.collect_tasks()
170
171        complete = 0
172        errors = 0
173        total = len(jobs)
174
175        # Send initial status
176        yield EvalProgress(complete=complete, total=total, errors=errors)
177
178        worker_queue: asyncio.Queue[EvalJob] = asyncio.Queue()
179        for job in jobs:
180            worker_queue.put_nowait(job)
181
182        # simple status queue to return progress. True=success, False=error
183        status_queue: asyncio.Queue[bool] = asyncio.Queue()
184
185        workers = []
186        for i in range(concurrency):
187            task = asyncio.create_task(self.run_worker(worker_queue, status_queue))
188            workers.append(task)
189
190        # Send status updates until workers are done, and they are all sent
191        while not status_queue.empty() or not all(worker.done() for worker in workers):
192            try:
193                # Use timeout to prevent hanging if all workers complete
194                # between our while condition check and get()
195                success = await asyncio.wait_for(status_queue.get(), timeout=0.1)
196                if success:
197                    complete += 1
198                else:
199                    errors += 1
200
201                yield EvalProgress(complete=complete, total=total, errors=errors)
202            except asyncio.TimeoutError:
203                # Timeout is expected, just continue to recheck worker status
204                # Don't love this but beats sentinels for reliability
205                continue
206
207        # These are redundant, but keeping them will catch async errors
208        await asyncio.gather(*workers)
209        await worker_queue.join()

Runs the configured eval run with parallel workers and yields progress updates.

async def run_worker( self, worker_queue: asyncio.queues.Queue[EvalJob], status_queue: asyncio.queues.Queue[bool]):
211    async def run_worker(
212        self, worker_queue: asyncio.Queue[EvalJob], status_queue: asyncio.Queue[bool]
213    ):
214        while True:
215            try:
216                job = worker_queue.get_nowait()
217            except asyncio.QueueEmpty:
218                # worker can end when the queue is empty
219                break
220            try:
221                success = await self.run_job(job)
222                await status_queue.put(success)
223            finally:
224                # Always mark the dequeued task as done, even on exceptions
225                worker_queue.task_done()
async def run_job(self, job: EvalJob) -> bool:
227    async def run_job(self, job: EvalJob) -> bool:
228        try:
229            # Create the evaluator for this eval config/run config pair
230            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
231                job.eval_config,
232                job.task_run_config.run_config() if job.task_run_config else None,
233            )
234            if not isinstance(evaluator, BaseEval):
235                raise ValueError("Not able to create evaluator from eval config")
236
237            task_output: str | None = None
238            scores: EvalScores | None = None
239            intermediate_outputs: Dict[str, str] | None = None
240            if job.type == "eval_config_eval":
241                # Eval config eval, we use the saved input from the task run, not invoking the task again
242                scores, intermediate_outputs = await evaluator.run_eval(job.item)
243                task_output = job.item.output.output
244            else:
245                # Task run eval, we invoke the task again to get a fresh output
246                (
247                    result_task_run,
248                    scores,
249                    intermediate_outputs,
250                ) = await evaluator.run_task_and_eval(job.item.input)
251                task_output = result_task_run.output.output
252
253            # Save the job result
254            eval_run = EvalRun(
255                parent=job.eval_config,
256                task_run_config_id=job.task_run_config.id
257                if job.task_run_config
258                else None,
259                dataset_id=job.item.id,
260                eval_config_eval=job.type == "eval_config_eval",
261                scores=scores,
262                input=job.item.input,
263                output=task_output,
264                intermediate_outputs=intermediate_outputs,
265            )
266            eval_run.save_to_file()
267
268            return True
269        except Exception as e:
270            logger.error(f"Error running eval job for dataset item {job.item.id}: {e}")
271            return False