kiln_ai.adapters.eval.eval_runner
1import asyncio 2import logging 3from dataclasses import dataclass 4from typing import AsyncGenerator, Dict, List, Literal, Set 5 6from kiln_ai.adapters.eval.base_eval import BaseEval 7from kiln_ai.adapters.eval.registry import eval_adapter_from_type 8from kiln_ai.datamodel.basemodel import ID_TYPE 9from kiln_ai.datamodel.dataset_filters import dataset_filter_from_id 10from kiln_ai.datamodel.eval import EvalConfig, EvalRun, EvalScores 11from kiln_ai.datamodel.task import TaskRunConfig 12from kiln_ai.datamodel.task_run import TaskRun 13 14logger = logging.getLogger(__name__) 15 16 17@dataclass 18class EvalJob: 19 item: TaskRun 20 type: Literal["task_run_eval", "eval_config_eval"] 21 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 22 eval_config: EvalConfig 23 task_run_config: TaskRunConfig | None = None 24 25 26@dataclass 27class EvalProgress: 28 complete: int | None = None 29 total: int | None = None 30 errors: int | None = None 31 32 33class EvalRunner: 34 """ 35 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 36 37 Can run an eval in 2 modes: 38 1) eval_config_eval: evaluate an eval config using existing dataset items. 39 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 40 """ 41 42 def __init__( 43 self, 44 eval_configs: List[EvalConfig], 45 run_configs: List[TaskRunConfig] | None, 46 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 47 ): 48 if len(eval_configs) == 0: 49 raise ValueError("Eval runner requires at least one eval config") 50 target_eval = eval_configs[0].parent_eval() 51 if target_eval is None: 52 raise ValueError("Eval config requires a parent eval") 53 for eval_config in eval_configs: 54 parent_eval = eval_config.parent_eval() 55 if parent_eval is None: 56 raise ValueError("Eval config requires a parent eval") 57 if parent_eval.id != target_eval.id: 58 raise ValueError("All eval configs must have the same parent eval") 59 60 target_task = target_eval.parent_task() 61 if target_task is None: 62 raise ValueError("Eval config requires a (grand)parent task") 63 64 # Check that run_configs is compatible 65 if eval_run_type == "task_run_eval": 66 if run_configs is None or len(run_configs) == 0: 67 raise ValueError("Task run eval requires run configs") 68 for run_config in run_configs: 69 parent_task = run_config.parent_task() 70 if parent_task is None: 71 raise ValueError("All run configs must have a parent task") 72 if parent_task.id != target_task.id: 73 raise ValueError( 74 "Run config is not for the same task as the eval configs" 75 ) 76 else: 77 if run_configs is not None: 78 raise ValueError("Mode 'eval_config_eval' does not support run configs") 79 80 self.eval_run_type = eval_run_type 81 self.eval_configs = eval_configs 82 self.run_configs = run_configs 83 self.task = target_task 84 self.eval = target_eval 85 86 def collect_tasks(self) -> List[EvalJob]: 87 if self.eval_run_type == "eval_config_eval": 88 return self.collect_tasks_for_eval_config_eval() 89 else: 90 return self.collect_tasks_for_task_run_eval() 91 92 def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]: 93 """ 94 Collect all jobs for this run, excluding any that have already been run. 95 96 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 97 98 The tasks: 99 - should be in the eval config set filter 100 - should not have already been run for this eval config + dataset item pair 101 """ 102 filter = dataset_filter_from_id(self.eval.eval_configs_filter_id) 103 104 # already_run[eval_config_id][dataset_id] 105 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 106 for eval_config in self.eval_configs: 107 already_run[eval_config.id] = set() 108 for run in eval_config.runs(readonly=True): 109 already_run[eval_config.id].add(run.dataset_id) 110 111 return [ 112 EvalJob( 113 item=task_run, 114 eval_config=eval_config, 115 type="eval_config_eval", 116 ) 117 for task_run in self.task.runs(readonly=True) 118 if filter(task_run) 119 for eval_config in self.eval_configs 120 if task_run.id not in already_run[eval_config.id] 121 ] 122 123 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 124 """ 125 Collect all jobs for this run, excluding any that have already been run. 126 127 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 128 129 The tasks: 130 - should be in the eval set filter 131 - should not have already been run for this eval config + run config + dataset item 132 """ 133 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 134 135 # already_run[eval_config_id][run_config_id][dataset_id] 136 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 137 for eval_config in self.eval_configs: 138 already_run[eval_config.id] = {} 139 for run_config in self.run_configs or []: 140 already_run[eval_config.id][run_config.id] = set() 141 for run in eval_config.runs(readonly=True): 142 if ( 143 run.task_run_config_id is not None 144 and run.task_run_config_id in already_run[eval_config.id] 145 ): 146 already_run[eval_config.id][run.task_run_config_id].add( 147 run.dataset_id 148 ) 149 150 return [ 151 EvalJob( 152 item=task_run, 153 task_run_config=run_config, 154 type="task_run_eval", 155 eval_config=eval_config, 156 ) 157 for task_run in self.task.runs(readonly=True) 158 if filter(task_run) 159 for eval_config in self.eval_configs 160 for run_config in self.run_configs or [] 161 if task_run.id not in already_run[eval_config.id][run_config.id] 162 ] 163 164 async def run(self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, None]: 165 """ 166 Runs the configured eval run with parallel workers and yields progress updates. 167 """ 168 jobs = self.collect_tasks() 169 170 complete = 0 171 errors = 0 172 total = len(jobs) 173 174 # Send initial status 175 yield EvalProgress(complete=complete, total=total, errors=errors) 176 177 worker_queue: asyncio.Queue[EvalJob] = asyncio.Queue() 178 for job in jobs: 179 worker_queue.put_nowait(job) 180 181 # simple status queue to return progress. True=success, False=error 182 status_queue: asyncio.Queue[bool] = asyncio.Queue() 183 184 workers = [] 185 for i in range(concurrency): 186 task = asyncio.create_task(self.run_worker(worker_queue, status_queue)) 187 workers.append(task) 188 189 # Send status updates until workers are done, and they are all sent 190 while not status_queue.empty() or not all(worker.done() for worker in workers): 191 try: 192 # Use timeout to prevent hanging if all workers complete 193 # between our while condition check and get() 194 success = await asyncio.wait_for(status_queue.get(), timeout=0.1) 195 if success: 196 complete += 1 197 else: 198 errors += 1 199 200 yield EvalProgress(complete=complete, total=total, errors=errors) 201 except asyncio.TimeoutError: 202 # Timeout is expected, just continue to recheck worker status 203 # Don't love this but beats sentinels for reliability 204 continue 205 206 # These are redundant, but keeping them will catch async errors 207 await asyncio.gather(*workers) 208 await worker_queue.join() 209 210 async def run_worker( 211 self, worker_queue: asyncio.Queue[EvalJob], status_queue: asyncio.Queue[bool] 212 ): 213 while True: 214 try: 215 job = worker_queue.get_nowait() 216 except asyncio.QueueEmpty: 217 # worker can end when the queue is empty 218 break 219 try: 220 success = await self.run_job(job) 221 await status_queue.put(success) 222 finally: 223 # Always mark the dequeued task as done, even on exceptions 224 worker_queue.task_done() 225 226 async def run_job(self, job: EvalJob) -> bool: 227 try: 228 # Create the evaluator for this eval config/run config pair 229 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 230 job.eval_config, 231 job.task_run_config.run_config() if job.task_run_config else None, 232 ) 233 if not isinstance(evaluator, BaseEval): 234 raise ValueError("Not able to create evaluator from eval config") 235 236 task_output: str | None = None 237 scores: EvalScores | None = None 238 intermediate_outputs: Dict[str, str] | None = None 239 if job.type == "eval_config_eval": 240 # Eval config eval, we use the saved input from the task run, not invoking the task again 241 scores, intermediate_outputs = await evaluator.run_eval(job.item) 242 task_output = job.item.output.output 243 else: 244 # Task run eval, we invoke the task again to get a fresh output 245 ( 246 result_task_run, 247 scores, 248 intermediate_outputs, 249 ) = await evaluator.run_task_and_eval(job.item.input) 250 task_output = result_task_run.output.output 251 252 # Save the job result 253 eval_run = EvalRun( 254 parent=job.eval_config, 255 task_run_config_id=job.task_run_config.id 256 if job.task_run_config 257 else None, 258 dataset_id=job.item.id, 259 eval_config_eval=job.type == "eval_config_eval", 260 scores=scores, 261 input=job.item.input, 262 output=task_output, 263 intermediate_outputs=intermediate_outputs, 264 ) 265 eval_run.save_to_file() 266 267 return True 268 except Exception as e: 269 logger.error(f"Error running eval job for dataset item {job.item.id}: {e}") 270 return False
18@dataclass 19class EvalJob: 20 item: TaskRun 21 type: Literal["task_run_eval", "eval_config_eval"] 22 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 23 eval_config: EvalConfig 24 task_run_config: TaskRunConfig | None = None
34class EvalRunner: 35 """ 36 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 37 38 Can run an eval in 2 modes: 39 1) eval_config_eval: evaluate an eval config using existing dataset items. 40 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 41 """ 42 43 def __init__( 44 self, 45 eval_configs: List[EvalConfig], 46 run_configs: List[TaskRunConfig] | None, 47 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 48 ): 49 if len(eval_configs) == 0: 50 raise ValueError("Eval runner requires at least one eval config") 51 target_eval = eval_configs[0].parent_eval() 52 if target_eval is None: 53 raise ValueError("Eval config requires a parent eval") 54 for eval_config in eval_configs: 55 parent_eval = eval_config.parent_eval() 56 if parent_eval is None: 57 raise ValueError("Eval config requires a parent eval") 58 if parent_eval.id != target_eval.id: 59 raise ValueError("All eval configs must have the same parent eval") 60 61 target_task = target_eval.parent_task() 62 if target_task is None: 63 raise ValueError("Eval config requires a (grand)parent task") 64 65 # Check that run_configs is compatible 66 if eval_run_type == "task_run_eval": 67 if run_configs is None or len(run_configs) == 0: 68 raise ValueError("Task run eval requires run configs") 69 for run_config in run_configs: 70 parent_task = run_config.parent_task() 71 if parent_task is None: 72 raise ValueError("All run configs must have a parent task") 73 if parent_task.id != target_task.id: 74 raise ValueError( 75 "Run config is not for the same task as the eval configs" 76 ) 77 else: 78 if run_configs is not None: 79 raise ValueError("Mode 'eval_config_eval' does not support run configs") 80 81 self.eval_run_type = eval_run_type 82 self.eval_configs = eval_configs 83 self.run_configs = run_configs 84 self.task = target_task 85 self.eval = target_eval 86 87 def collect_tasks(self) -> List[EvalJob]: 88 if self.eval_run_type == "eval_config_eval": 89 return self.collect_tasks_for_eval_config_eval() 90 else: 91 return self.collect_tasks_for_task_run_eval() 92 93 def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]: 94 """ 95 Collect all jobs for this run, excluding any that have already been run. 96 97 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 98 99 The tasks: 100 - should be in the eval config set filter 101 - should not have already been run for this eval config + dataset item pair 102 """ 103 filter = dataset_filter_from_id(self.eval.eval_configs_filter_id) 104 105 # already_run[eval_config_id][dataset_id] 106 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 107 for eval_config in self.eval_configs: 108 already_run[eval_config.id] = set() 109 for run in eval_config.runs(readonly=True): 110 already_run[eval_config.id].add(run.dataset_id) 111 112 return [ 113 EvalJob( 114 item=task_run, 115 eval_config=eval_config, 116 type="eval_config_eval", 117 ) 118 for task_run in self.task.runs(readonly=True) 119 if filter(task_run) 120 for eval_config in self.eval_configs 121 if task_run.id not in already_run[eval_config.id] 122 ] 123 124 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 125 """ 126 Collect all jobs for this run, excluding any that have already been run. 127 128 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 129 130 The tasks: 131 - should be in the eval set filter 132 - should not have already been run for this eval config + run config + dataset item 133 """ 134 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 135 136 # already_run[eval_config_id][run_config_id][dataset_id] 137 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 138 for eval_config in self.eval_configs: 139 already_run[eval_config.id] = {} 140 for run_config in self.run_configs or []: 141 already_run[eval_config.id][run_config.id] = set() 142 for run in eval_config.runs(readonly=True): 143 if ( 144 run.task_run_config_id is not None 145 and run.task_run_config_id in already_run[eval_config.id] 146 ): 147 already_run[eval_config.id][run.task_run_config_id].add( 148 run.dataset_id 149 ) 150 151 return [ 152 EvalJob( 153 item=task_run, 154 task_run_config=run_config, 155 type="task_run_eval", 156 eval_config=eval_config, 157 ) 158 for task_run in self.task.runs(readonly=True) 159 if filter(task_run) 160 for eval_config in self.eval_configs 161 for run_config in self.run_configs or [] 162 if task_run.id not in already_run[eval_config.id][run_config.id] 163 ] 164 165 async def run(self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, None]: 166 """ 167 Runs the configured eval run with parallel workers and yields progress updates. 168 """ 169 jobs = self.collect_tasks() 170 171 complete = 0 172 errors = 0 173 total = len(jobs) 174 175 # Send initial status 176 yield EvalProgress(complete=complete, total=total, errors=errors) 177 178 worker_queue: asyncio.Queue[EvalJob] = asyncio.Queue() 179 for job in jobs: 180 worker_queue.put_nowait(job) 181 182 # simple status queue to return progress. True=success, False=error 183 status_queue: asyncio.Queue[bool] = asyncio.Queue() 184 185 workers = [] 186 for i in range(concurrency): 187 task = asyncio.create_task(self.run_worker(worker_queue, status_queue)) 188 workers.append(task) 189 190 # Send status updates until workers are done, and they are all sent 191 while not status_queue.empty() or not all(worker.done() for worker in workers): 192 try: 193 # Use timeout to prevent hanging if all workers complete 194 # between our while condition check and get() 195 success = await asyncio.wait_for(status_queue.get(), timeout=0.1) 196 if success: 197 complete += 1 198 else: 199 errors += 1 200 201 yield EvalProgress(complete=complete, total=total, errors=errors) 202 except asyncio.TimeoutError: 203 # Timeout is expected, just continue to recheck worker status 204 # Don't love this but beats sentinels for reliability 205 continue 206 207 # These are redundant, but keeping them will catch async errors 208 await asyncio.gather(*workers) 209 await worker_queue.join() 210 211 async def run_worker( 212 self, worker_queue: asyncio.Queue[EvalJob], status_queue: asyncio.Queue[bool] 213 ): 214 while True: 215 try: 216 job = worker_queue.get_nowait() 217 except asyncio.QueueEmpty: 218 # worker can end when the queue is empty 219 break 220 try: 221 success = await self.run_job(job) 222 await status_queue.put(success) 223 finally: 224 # Always mark the dequeued task as done, even on exceptions 225 worker_queue.task_done() 226 227 async def run_job(self, job: EvalJob) -> bool: 228 try: 229 # Create the evaluator for this eval config/run config pair 230 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 231 job.eval_config, 232 job.task_run_config.run_config() if job.task_run_config else None, 233 ) 234 if not isinstance(evaluator, BaseEval): 235 raise ValueError("Not able to create evaluator from eval config") 236 237 task_output: str | None = None 238 scores: EvalScores | None = None 239 intermediate_outputs: Dict[str, str] | None = None 240 if job.type == "eval_config_eval": 241 # Eval config eval, we use the saved input from the task run, not invoking the task again 242 scores, intermediate_outputs = await evaluator.run_eval(job.item) 243 task_output = job.item.output.output 244 else: 245 # Task run eval, we invoke the task again to get a fresh output 246 ( 247 result_task_run, 248 scores, 249 intermediate_outputs, 250 ) = await evaluator.run_task_and_eval(job.item.input) 251 task_output = result_task_run.output.output 252 253 # Save the job result 254 eval_run = EvalRun( 255 parent=job.eval_config, 256 task_run_config_id=job.task_run_config.id 257 if job.task_run_config 258 else None, 259 dataset_id=job.item.id, 260 eval_config_eval=job.type == "eval_config_eval", 261 scores=scores, 262 input=job.item.input, 263 output=task_output, 264 intermediate_outputs=intermediate_outputs, 265 ) 266 eval_run.save_to_file() 267 268 return True 269 except Exception as e: 270 logger.error(f"Error running eval job for dataset item {job.item.id}: {e}") 271 return False
Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
43 def __init__( 44 self, 45 eval_configs: List[EvalConfig], 46 run_configs: List[TaskRunConfig] | None, 47 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 48 ): 49 if len(eval_configs) == 0: 50 raise ValueError("Eval runner requires at least one eval config") 51 target_eval = eval_configs[0].parent_eval() 52 if target_eval is None: 53 raise ValueError("Eval config requires a parent eval") 54 for eval_config in eval_configs: 55 parent_eval = eval_config.parent_eval() 56 if parent_eval is None: 57 raise ValueError("Eval config requires a parent eval") 58 if parent_eval.id != target_eval.id: 59 raise ValueError("All eval configs must have the same parent eval") 60 61 target_task = target_eval.parent_task() 62 if target_task is None: 63 raise ValueError("Eval config requires a (grand)parent task") 64 65 # Check that run_configs is compatible 66 if eval_run_type == "task_run_eval": 67 if run_configs is None or len(run_configs) == 0: 68 raise ValueError("Task run eval requires run configs") 69 for run_config in run_configs: 70 parent_task = run_config.parent_task() 71 if parent_task is None: 72 raise ValueError("All run configs must have a parent task") 73 if parent_task.id != target_task.id: 74 raise ValueError( 75 "Run config is not for the same task as the eval configs" 76 ) 77 else: 78 if run_configs is not None: 79 raise ValueError("Mode 'eval_config_eval' does not support run configs") 80 81 self.eval_run_type = eval_run_type 82 self.eval_configs = eval_configs 83 self.run_configs = run_configs 84 self.task = target_task 85 self.eval = target_eval
93 def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]: 94 """ 95 Collect all jobs for this run, excluding any that have already been run. 96 97 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 98 99 The tasks: 100 - should be in the eval config set filter 101 - should not have already been run for this eval config + dataset item pair 102 """ 103 filter = dataset_filter_from_id(self.eval.eval_configs_filter_id) 104 105 # already_run[eval_config_id][dataset_id] 106 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 107 for eval_config in self.eval_configs: 108 already_run[eval_config.id] = set() 109 for run in eval_config.runs(readonly=True): 110 already_run[eval_config.id].add(run.dataset_id) 111 112 return [ 113 EvalJob( 114 item=task_run, 115 eval_config=eval_config, 116 type="eval_config_eval", 117 ) 118 for task_run in self.task.runs(readonly=True) 119 if filter(task_run) 120 for eval_config in self.eval_configs 121 if task_run.id not in already_run[eval_config.id] 122 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
The tasks:
- should be in the eval config set filter
- should not have already been run for this eval config + dataset item pair
124 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 125 """ 126 Collect all jobs for this run, excluding any that have already been run. 127 128 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 129 130 The tasks: 131 - should be in the eval set filter 132 - should not have already been run for this eval config + run config + dataset item 133 """ 134 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 135 136 # already_run[eval_config_id][run_config_id][dataset_id] 137 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 138 for eval_config in self.eval_configs: 139 already_run[eval_config.id] = {} 140 for run_config in self.run_configs or []: 141 already_run[eval_config.id][run_config.id] = set() 142 for run in eval_config.runs(readonly=True): 143 if ( 144 run.task_run_config_id is not None 145 and run.task_run_config_id in already_run[eval_config.id] 146 ): 147 already_run[eval_config.id][run.task_run_config_id].add( 148 run.dataset_id 149 ) 150 151 return [ 152 EvalJob( 153 item=task_run, 154 task_run_config=run_config, 155 type="task_run_eval", 156 eval_config=eval_config, 157 ) 158 for task_run in self.task.runs(readonly=True) 159 if filter(task_run) 160 for eval_config in self.eval_configs 161 for run_config in self.run_configs or [] 162 if task_run.id not in already_run[eval_config.id][run_config.id] 163 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
The tasks:
- should be in the eval set filter
- should not have already been run for this eval config + run config + dataset item
165 async def run(self, concurrency: int = 25) -> AsyncGenerator[EvalProgress, None]: 166 """ 167 Runs the configured eval run with parallel workers and yields progress updates. 168 """ 169 jobs = self.collect_tasks() 170 171 complete = 0 172 errors = 0 173 total = len(jobs) 174 175 # Send initial status 176 yield EvalProgress(complete=complete, total=total, errors=errors) 177 178 worker_queue: asyncio.Queue[EvalJob] = asyncio.Queue() 179 for job in jobs: 180 worker_queue.put_nowait(job) 181 182 # simple status queue to return progress. True=success, False=error 183 status_queue: asyncio.Queue[bool] = asyncio.Queue() 184 185 workers = [] 186 for i in range(concurrency): 187 task = asyncio.create_task(self.run_worker(worker_queue, status_queue)) 188 workers.append(task) 189 190 # Send status updates until workers are done, and they are all sent 191 while not status_queue.empty() or not all(worker.done() for worker in workers): 192 try: 193 # Use timeout to prevent hanging if all workers complete 194 # between our while condition check and get() 195 success = await asyncio.wait_for(status_queue.get(), timeout=0.1) 196 if success: 197 complete += 1 198 else: 199 errors += 1 200 201 yield EvalProgress(complete=complete, total=total, errors=errors) 202 except asyncio.TimeoutError: 203 # Timeout is expected, just continue to recheck worker status 204 # Don't love this but beats sentinels for reliability 205 continue 206 207 # These are redundant, but keeping them will catch async errors 208 await asyncio.gather(*workers) 209 await worker_queue.join()
Runs the configured eval run with parallel workers and yields progress updates.
211 async def run_worker( 212 self, worker_queue: asyncio.Queue[EvalJob], status_queue: asyncio.Queue[bool] 213 ): 214 while True: 215 try: 216 job = worker_queue.get_nowait() 217 except asyncio.QueueEmpty: 218 # worker can end when the queue is empty 219 break 220 try: 221 success = await self.run_job(job) 222 await status_queue.put(success) 223 finally: 224 # Always mark the dequeued task as done, even on exceptions 225 worker_queue.task_done()
227 async def run_job(self, job: EvalJob) -> bool: 228 try: 229 # Create the evaluator for this eval config/run config pair 230 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 231 job.eval_config, 232 job.task_run_config.run_config() if job.task_run_config else None, 233 ) 234 if not isinstance(evaluator, BaseEval): 235 raise ValueError("Not able to create evaluator from eval config") 236 237 task_output: str | None = None 238 scores: EvalScores | None = None 239 intermediate_outputs: Dict[str, str] | None = None 240 if job.type == "eval_config_eval": 241 # Eval config eval, we use the saved input from the task run, not invoking the task again 242 scores, intermediate_outputs = await evaluator.run_eval(job.item) 243 task_output = job.item.output.output 244 else: 245 # Task run eval, we invoke the task again to get a fresh output 246 ( 247 result_task_run, 248 scores, 249 intermediate_outputs, 250 ) = await evaluator.run_task_and_eval(job.item.input) 251 task_output = result_task_run.output.output 252 253 # Save the job result 254 eval_run = EvalRun( 255 parent=job.eval_config, 256 task_run_config_id=job.task_run_config.id 257 if job.task_run_config 258 else None, 259 dataset_id=job.item.id, 260 eval_config_eval=job.type == "eval_config_eval", 261 scores=scores, 262 input=job.item.input, 263 output=task_output, 264 intermediate_outputs=intermediate_outputs, 265 ) 266 eval_run.save_to_file() 267 268 return True 269 except Exception as e: 270 logger.error(f"Error running eval job for dataset item {job.item.id}: {e}") 271 return False