kiln_ai.adapters.eval.eval_runner
1import json 2import logging 3from dataclasses import dataclass 4from typing import AsyncGenerator, Dict, List, Literal, Set 5 6import litellm 7 8from kiln_ai.adapters.adapter_registry import load_skills_for_task 9from kiln_ai.adapters.eval.base_eval import BaseEval 10from kiln_ai.adapters.eval.registry import eval_adapter_from_type 11from kiln_ai.adapters.model_adapters.base_adapter import SkillsDict 12from kiln_ai.datamodel.basemodel import ID_TYPE 13from kiln_ai.datamodel.dataset_filters import DatasetFilterId, dataset_filter_from_id 14from kiln_ai.datamodel.eval import EvalConfig, EvalDataType, EvalRun, EvalScores 15from kiln_ai.datamodel.task import TaskRunConfig 16from kiln_ai.datamodel.task_run import TaskRun, Usage 17from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress, RetryableError 18from kiln_ai.utils.git_sync_protocols import SaveContext, default_save_context 19 20logger = logging.getLogger(__name__) 21 22 23@dataclass 24class EvalJob: 25 item: TaskRun 26 type: Literal["task_run_eval", "eval_config_eval"] 27 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 28 eval_config: EvalConfig 29 task_run_config: TaskRunConfig | None = None 30 31 32class EvalRunner: 33 """ 34 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 35 36 Can run an eval in 2 modes: 37 1) eval_config_eval: evaluate an eval config using existing dataset items. 38 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 39 """ 40 41 def __init__( 42 self, 43 eval_configs: List[EvalConfig], 44 run_configs: List[TaskRunConfig] | None, 45 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 46 save_context: SaveContext | None = None, 47 ): 48 if len(eval_configs) == 0: 49 raise ValueError("Eval runner requires at least one eval config") 50 target_eval = eval_configs[0].parent_eval() 51 if target_eval is None: 52 raise ValueError("Eval config requires a parent eval") 53 for eval_config in eval_configs: 54 parent_eval = eval_config.parent_eval() 55 if parent_eval is None: 56 raise ValueError("Eval config requires a parent eval") 57 if parent_eval.id != target_eval.id: 58 raise ValueError("All eval configs must have the same parent eval") 59 60 target_task = target_eval.parent_task() 61 if target_task is None: 62 raise ValueError("Eval config requires a (grand)parent task") 63 64 # Check that run_configs is compatible 65 if eval_run_type == "task_run_eval": 66 if run_configs is None or len(run_configs) == 0: 67 raise ValueError("Task run eval requires run configs") 68 for run_config in run_configs: 69 parent_task = run_config.parent_task() 70 if parent_task is None: 71 raise ValueError("All run configs must have a parent task") 72 if parent_task.id != target_task.id: 73 raise ValueError( 74 "Run config is not for the same task as the eval configs" 75 ) 76 else: 77 if run_configs is not None: 78 raise ValueError("Mode 'eval_config_eval' does not support run configs") 79 80 self.eval_run_type = eval_run_type 81 self.eval_configs = eval_configs 82 self.run_configs = run_configs 83 self.task = target_task 84 self.eval = target_eval 85 self._skills: SkillsDict = self._preload_skills() 86 self._save_context: SaveContext = save_context or default_save_context 87 88 def collect_tasks(self) -> List[EvalJob]: 89 if self.eval_run_type == "eval_config_eval": 90 if self.eval.eval_configs_filter_id is not None: 91 return self.collect_tasks_for_eval_config_eval( 92 self.eval.eval_configs_filter_id 93 ) 94 else: 95 raise ValueError( 96 "Eval configs filter ID is required for eval runs of type 'eval_config_eval'" 97 ) 98 99 else: 100 return self.collect_tasks_for_task_run_eval() 101 102 def collect_tasks_for_eval_config_eval( 103 self, eval_configs_filter_id: DatasetFilterId 104 ) -> List[EvalJob]: 105 """ 106 Collect all jobs for this run, excluding any that have already been run. 107 108 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 109 110 The tasks: 111 - should be in the eval config set filter 112 - should not have already been run for this eval config + dataset item pair 113 """ 114 filter = dataset_filter_from_id(eval_configs_filter_id) 115 116 # already_run[eval_config_id][dataset_id] 117 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 118 for eval_config in self.eval_configs: 119 already_run[eval_config.id] = set() 120 for run in eval_config.runs(readonly=True): 121 already_run[eval_config.id].add(run.dataset_id) 122 123 return [ 124 EvalJob( 125 item=task_run, 126 eval_config=eval_config, 127 type="eval_config_eval", 128 ) 129 for task_run in self.task.runs(readonly=True) 130 if filter(task_run) 131 for eval_config in self.eval_configs 132 if task_run.id not in already_run[eval_config.id] 133 ] 134 135 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 136 """ 137 Collect all jobs for this run, excluding any that have already been run. 138 139 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 140 141 The tasks: 142 - should be in the eval set filter 143 - should not have already been run for this eval config + run config + dataset item 144 """ 145 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 146 147 # already_run[eval_config_id][run_config_id][dataset_id] 148 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 149 for eval_config in self.eval_configs: 150 already_run[eval_config.id] = {} 151 for run_config in self.run_configs or []: 152 already_run[eval_config.id][run_config.id] = set() 153 for run in eval_config.runs(readonly=True): 154 if ( 155 run.task_run_config_id is not None 156 and run.task_run_config_id in already_run[eval_config.id] 157 ): 158 already_run[eval_config.id][run.task_run_config_id].add( 159 run.dataset_id 160 ) 161 162 return [ 163 EvalJob( 164 item=task_run, 165 task_run_config=run_config, 166 type="task_run_eval", 167 eval_config=eval_config, 168 ) 169 for task_run in self.task.runs(readonly=True) 170 if filter(task_run) 171 for eval_config in self.eval_configs 172 for run_config in self.run_configs or [] 173 if task_run.id not in already_run[eval_config.id][run_config.id] 174 ] 175 176 def _preload_skills(self) -> SkillsDict: 177 """Collect all skill IDs from run configs and bulk-load them once.""" 178 if self.run_configs is None: 179 return {} 180 merged: SkillsDict = {} 181 for rc in self.run_configs: 182 skills = load_skills_for_task(self.task, rc.run_config_properties) 183 merged.update(skills) 184 return merged 185 186 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 187 """ 188 Runs the configured eval run with parallel workers and yields progress updates. 189 """ 190 jobs = self.collect_tasks() 191 192 runner = AsyncJobRunner( 193 concurrency=concurrency, 194 jobs=jobs, 195 run_job_fn=self.run_job, 196 max_retries=2, 197 ) 198 async for progress in runner.run(): 199 yield progress 200 201 async def run_job(self, job: EvalJob) -> bool: 202 try: 203 # Create the evaluator for this eval config/run config pair 204 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 205 job.eval_config, 206 job.task_run_config.run_config_properties 207 if job.task_run_config 208 else None, 209 skills=self._skills, 210 ) 211 if not isinstance(evaluator, BaseEval): 212 raise ValueError("Not able to create evaluator from eval config") 213 214 task_output: str | None = None 215 reference_answer: str | None = None 216 trace: str | None = None 217 scores: EvalScores | None = None 218 intermediate_outputs: Dict[str, str] | None = None 219 task_run_usage: Usage | None = None 220 if job.type == "eval_config_eval": 221 # Eval config eval, we use the saved input from the task run, not invoking the task again 222 scores, intermediate_outputs = await evaluator.run_eval(job.item) 223 task_output = job.item.output.output 224 task_run_usage = job.item.usage 225 else: 226 # Task run eval, we invoke the task again to get a fresh output 227 ( 228 result_task_run, 229 scores, 230 intermediate_outputs, 231 ) = await evaluator.run_task_and_eval(job.item) 232 task_output = result_task_run.output.output 233 task_run_usage = result_task_run.usage 234 235 parent_eval = job.eval_config.parent_eval() 236 if ( 237 parent_eval 238 and parent_eval.evaluation_data_type == EvalDataType.full_trace 239 and result_task_run.trace 240 ): 241 trace = json.dumps(result_task_run.trace, indent=2) 242 243 if ( 244 parent_eval 245 and parent_eval.evaluation_data_type 246 == EvalDataType.reference_answer 247 ): 248 reference_answer = job.item.output.output 249 250 # Save the job result 251 async with self._save_context(): 252 eval_run = EvalRun( 253 parent=job.eval_config, 254 task_run_config_id=job.task_run_config.id 255 if job.task_run_config 256 else None, 257 dataset_id=job.item.id, 258 eval_config_eval=job.type == "eval_config_eval", 259 scores=scores, 260 input=job.item.input, 261 output=task_output, 262 reference_answer=reference_answer, 263 intermediate_outputs=intermediate_outputs, 264 task_run_trace=trace, 265 task_run_usage=task_run_usage, 266 ) 267 eval_run.save_to_file() 268 269 return True 270 except Exception as e: 271 if _is_retryable_error(e): 272 logger.error( 273 f"Transient error running eval job for dataset item {job.item.id}: {e}", 274 exc_info=True, 275 ) 276 raise RetryableError(str(e)) from e 277 logger.error( 278 f"Error running eval job for dataset item {job.item.id}: {e}", 279 exc_info=True, 280 ) 281 raise 282 283 284def _is_retryable_error(e: BaseException) -> bool: 285 if isinstance( 286 e, 287 ( 288 litellm.RateLimitError, 289 litellm.APIConnectionError, 290 litellm.InternalServerError, 291 litellm.ServiceUnavailableError, 292 litellm.BadGatewayError, 293 litellm.JSONSchemaValidationError, 294 ), 295 ): 296 return True 297 298 # ValueError thrown by Kiln's adapter when structured output doesn't match schema 299 if isinstance( 300 e, ValueError 301 ) and "This task requires a specific output schema" in str(e): 302 return True 303 304 return False
24@dataclass 25class EvalJob: 26 item: TaskRun 27 type: Literal["task_run_eval", "eval_config_eval"] 28 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 29 eval_config: EvalConfig 30 task_run_config: TaskRunConfig | None = None
33class EvalRunner: 34 """ 35 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 36 37 Can run an eval in 2 modes: 38 1) eval_config_eval: evaluate an eval config using existing dataset items. 39 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 40 """ 41 42 def __init__( 43 self, 44 eval_configs: List[EvalConfig], 45 run_configs: List[TaskRunConfig] | None, 46 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 47 save_context: SaveContext | None = None, 48 ): 49 if len(eval_configs) == 0: 50 raise ValueError("Eval runner requires at least one eval config") 51 target_eval = eval_configs[0].parent_eval() 52 if target_eval is None: 53 raise ValueError("Eval config requires a parent eval") 54 for eval_config in eval_configs: 55 parent_eval = eval_config.parent_eval() 56 if parent_eval is None: 57 raise ValueError("Eval config requires a parent eval") 58 if parent_eval.id != target_eval.id: 59 raise ValueError("All eval configs must have the same parent eval") 60 61 target_task = target_eval.parent_task() 62 if target_task is None: 63 raise ValueError("Eval config requires a (grand)parent task") 64 65 # Check that run_configs is compatible 66 if eval_run_type == "task_run_eval": 67 if run_configs is None or len(run_configs) == 0: 68 raise ValueError("Task run eval requires run configs") 69 for run_config in run_configs: 70 parent_task = run_config.parent_task() 71 if parent_task is None: 72 raise ValueError("All run configs must have a parent task") 73 if parent_task.id != target_task.id: 74 raise ValueError( 75 "Run config is not for the same task as the eval configs" 76 ) 77 else: 78 if run_configs is not None: 79 raise ValueError("Mode 'eval_config_eval' does not support run configs") 80 81 self.eval_run_type = eval_run_type 82 self.eval_configs = eval_configs 83 self.run_configs = run_configs 84 self.task = target_task 85 self.eval = target_eval 86 self._skills: SkillsDict = self._preload_skills() 87 self._save_context: SaveContext = save_context or default_save_context 88 89 def collect_tasks(self) -> List[EvalJob]: 90 if self.eval_run_type == "eval_config_eval": 91 if self.eval.eval_configs_filter_id is not None: 92 return self.collect_tasks_for_eval_config_eval( 93 self.eval.eval_configs_filter_id 94 ) 95 else: 96 raise ValueError( 97 "Eval configs filter ID is required for eval runs of type 'eval_config_eval'" 98 ) 99 100 else: 101 return self.collect_tasks_for_task_run_eval() 102 103 def collect_tasks_for_eval_config_eval( 104 self, eval_configs_filter_id: DatasetFilterId 105 ) -> List[EvalJob]: 106 """ 107 Collect all jobs for this run, excluding any that have already been run. 108 109 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 110 111 The tasks: 112 - should be in the eval config set filter 113 - should not have already been run for this eval config + dataset item pair 114 """ 115 filter = dataset_filter_from_id(eval_configs_filter_id) 116 117 # already_run[eval_config_id][dataset_id] 118 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 119 for eval_config in self.eval_configs: 120 already_run[eval_config.id] = set() 121 for run in eval_config.runs(readonly=True): 122 already_run[eval_config.id].add(run.dataset_id) 123 124 return [ 125 EvalJob( 126 item=task_run, 127 eval_config=eval_config, 128 type="eval_config_eval", 129 ) 130 for task_run in self.task.runs(readonly=True) 131 if filter(task_run) 132 for eval_config in self.eval_configs 133 if task_run.id not in already_run[eval_config.id] 134 ] 135 136 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 137 """ 138 Collect all jobs for this run, excluding any that have already been run. 139 140 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 141 142 The tasks: 143 - should be in the eval set filter 144 - should not have already been run for this eval config + run config + dataset item 145 """ 146 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 147 148 # already_run[eval_config_id][run_config_id][dataset_id] 149 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 150 for eval_config in self.eval_configs: 151 already_run[eval_config.id] = {} 152 for run_config in self.run_configs or []: 153 already_run[eval_config.id][run_config.id] = set() 154 for run in eval_config.runs(readonly=True): 155 if ( 156 run.task_run_config_id is not None 157 and run.task_run_config_id in already_run[eval_config.id] 158 ): 159 already_run[eval_config.id][run.task_run_config_id].add( 160 run.dataset_id 161 ) 162 163 return [ 164 EvalJob( 165 item=task_run, 166 task_run_config=run_config, 167 type="task_run_eval", 168 eval_config=eval_config, 169 ) 170 for task_run in self.task.runs(readonly=True) 171 if filter(task_run) 172 for eval_config in self.eval_configs 173 for run_config in self.run_configs or [] 174 if task_run.id not in already_run[eval_config.id][run_config.id] 175 ] 176 177 def _preload_skills(self) -> SkillsDict: 178 """Collect all skill IDs from run configs and bulk-load them once.""" 179 if self.run_configs is None: 180 return {} 181 merged: SkillsDict = {} 182 for rc in self.run_configs: 183 skills = load_skills_for_task(self.task, rc.run_config_properties) 184 merged.update(skills) 185 return merged 186 187 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 188 """ 189 Runs the configured eval run with parallel workers and yields progress updates. 190 """ 191 jobs = self.collect_tasks() 192 193 runner = AsyncJobRunner( 194 concurrency=concurrency, 195 jobs=jobs, 196 run_job_fn=self.run_job, 197 max_retries=2, 198 ) 199 async for progress in runner.run(): 200 yield progress 201 202 async def run_job(self, job: EvalJob) -> bool: 203 try: 204 # Create the evaluator for this eval config/run config pair 205 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 206 job.eval_config, 207 job.task_run_config.run_config_properties 208 if job.task_run_config 209 else None, 210 skills=self._skills, 211 ) 212 if not isinstance(evaluator, BaseEval): 213 raise ValueError("Not able to create evaluator from eval config") 214 215 task_output: str | None = None 216 reference_answer: str | None = None 217 trace: str | None = None 218 scores: EvalScores | None = None 219 intermediate_outputs: Dict[str, str] | None = None 220 task_run_usage: Usage | None = None 221 if job.type == "eval_config_eval": 222 # Eval config eval, we use the saved input from the task run, not invoking the task again 223 scores, intermediate_outputs = await evaluator.run_eval(job.item) 224 task_output = job.item.output.output 225 task_run_usage = job.item.usage 226 else: 227 # Task run eval, we invoke the task again to get a fresh output 228 ( 229 result_task_run, 230 scores, 231 intermediate_outputs, 232 ) = await evaluator.run_task_and_eval(job.item) 233 task_output = result_task_run.output.output 234 task_run_usage = result_task_run.usage 235 236 parent_eval = job.eval_config.parent_eval() 237 if ( 238 parent_eval 239 and parent_eval.evaluation_data_type == EvalDataType.full_trace 240 and result_task_run.trace 241 ): 242 trace = json.dumps(result_task_run.trace, indent=2) 243 244 if ( 245 parent_eval 246 and parent_eval.evaluation_data_type 247 == EvalDataType.reference_answer 248 ): 249 reference_answer = job.item.output.output 250 251 # Save the job result 252 async with self._save_context(): 253 eval_run = EvalRun( 254 parent=job.eval_config, 255 task_run_config_id=job.task_run_config.id 256 if job.task_run_config 257 else None, 258 dataset_id=job.item.id, 259 eval_config_eval=job.type == "eval_config_eval", 260 scores=scores, 261 input=job.item.input, 262 output=task_output, 263 reference_answer=reference_answer, 264 intermediate_outputs=intermediate_outputs, 265 task_run_trace=trace, 266 task_run_usage=task_run_usage, 267 ) 268 eval_run.save_to_file() 269 270 return True 271 except Exception as e: 272 if _is_retryable_error(e): 273 logger.error( 274 f"Transient error running eval job for dataset item {job.item.id}: {e}", 275 exc_info=True, 276 ) 277 raise RetryableError(str(e)) from e 278 logger.error( 279 f"Error running eval job for dataset item {job.item.id}: {e}", 280 exc_info=True, 281 ) 282 raise
Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
42 def __init__( 43 self, 44 eval_configs: List[EvalConfig], 45 run_configs: List[TaskRunConfig] | None, 46 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 47 save_context: SaveContext | None = None, 48 ): 49 if len(eval_configs) == 0: 50 raise ValueError("Eval runner requires at least one eval config") 51 target_eval = eval_configs[0].parent_eval() 52 if target_eval is None: 53 raise ValueError("Eval config requires a parent eval") 54 for eval_config in eval_configs: 55 parent_eval = eval_config.parent_eval() 56 if parent_eval is None: 57 raise ValueError("Eval config requires a parent eval") 58 if parent_eval.id != target_eval.id: 59 raise ValueError("All eval configs must have the same parent eval") 60 61 target_task = target_eval.parent_task() 62 if target_task is None: 63 raise ValueError("Eval config requires a (grand)parent task") 64 65 # Check that run_configs is compatible 66 if eval_run_type == "task_run_eval": 67 if run_configs is None or len(run_configs) == 0: 68 raise ValueError("Task run eval requires run configs") 69 for run_config in run_configs: 70 parent_task = run_config.parent_task() 71 if parent_task is None: 72 raise ValueError("All run configs must have a parent task") 73 if parent_task.id != target_task.id: 74 raise ValueError( 75 "Run config is not for the same task as the eval configs" 76 ) 77 else: 78 if run_configs is not None: 79 raise ValueError("Mode 'eval_config_eval' does not support run configs") 80 81 self.eval_run_type = eval_run_type 82 self.eval_configs = eval_configs 83 self.run_configs = run_configs 84 self.task = target_task 85 self.eval = target_eval 86 self._skills: SkillsDict = self._preload_skills() 87 self._save_context: SaveContext = save_context or default_save_context
89 def collect_tasks(self) -> List[EvalJob]: 90 if self.eval_run_type == "eval_config_eval": 91 if self.eval.eval_configs_filter_id is not None: 92 return self.collect_tasks_for_eval_config_eval( 93 self.eval.eval_configs_filter_id 94 ) 95 else: 96 raise ValueError( 97 "Eval configs filter ID is required for eval runs of type 'eval_config_eval'" 98 ) 99 100 else: 101 return self.collect_tasks_for_task_run_eval()
103 def collect_tasks_for_eval_config_eval( 104 self, eval_configs_filter_id: DatasetFilterId 105 ) -> List[EvalJob]: 106 """ 107 Collect all jobs for this run, excluding any that have already been run. 108 109 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 110 111 The tasks: 112 - should be in the eval config set filter 113 - should not have already been run for this eval config + dataset item pair 114 """ 115 filter = dataset_filter_from_id(eval_configs_filter_id) 116 117 # already_run[eval_config_id][dataset_id] 118 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 119 for eval_config in self.eval_configs: 120 already_run[eval_config.id] = set() 121 for run in eval_config.runs(readonly=True): 122 already_run[eval_config.id].add(run.dataset_id) 123 124 return [ 125 EvalJob( 126 item=task_run, 127 eval_config=eval_config, 128 type="eval_config_eval", 129 ) 130 for task_run in self.task.runs(readonly=True) 131 if filter(task_run) 132 for eval_config in self.eval_configs 133 if task_run.id not in already_run[eval_config.id] 134 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
The tasks:
- should be in the eval config set filter
- should not have already been run for this eval config + dataset item pair
136 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 137 """ 138 Collect all jobs for this run, excluding any that have already been run. 139 140 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 141 142 The tasks: 143 - should be in the eval set filter 144 - should not have already been run for this eval config + run config + dataset item 145 """ 146 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 147 148 # already_run[eval_config_id][run_config_id][dataset_id] 149 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 150 for eval_config in self.eval_configs: 151 already_run[eval_config.id] = {} 152 for run_config in self.run_configs or []: 153 already_run[eval_config.id][run_config.id] = set() 154 for run in eval_config.runs(readonly=True): 155 if ( 156 run.task_run_config_id is not None 157 and run.task_run_config_id in already_run[eval_config.id] 158 ): 159 already_run[eval_config.id][run.task_run_config_id].add( 160 run.dataset_id 161 ) 162 163 return [ 164 EvalJob( 165 item=task_run, 166 task_run_config=run_config, 167 type="task_run_eval", 168 eval_config=eval_config, 169 ) 170 for task_run in self.task.runs(readonly=True) 171 if filter(task_run) 172 for eval_config in self.eval_configs 173 for run_config in self.run_configs or [] 174 if task_run.id not in already_run[eval_config.id][run_config.id] 175 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
The tasks:
- should be in the eval set filter
- should not have already been run for this eval config + run config + dataset item
187 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 188 """ 189 Runs the configured eval run with parallel workers and yields progress updates. 190 """ 191 jobs = self.collect_tasks() 192 193 runner = AsyncJobRunner( 194 concurrency=concurrency, 195 jobs=jobs, 196 run_job_fn=self.run_job, 197 max_retries=2, 198 ) 199 async for progress in runner.run(): 200 yield progress
Runs the configured eval run with parallel workers and yields progress updates.
202 async def run_job(self, job: EvalJob) -> bool: 203 try: 204 # Create the evaluator for this eval config/run config pair 205 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 206 job.eval_config, 207 job.task_run_config.run_config_properties 208 if job.task_run_config 209 else None, 210 skills=self._skills, 211 ) 212 if not isinstance(evaluator, BaseEval): 213 raise ValueError("Not able to create evaluator from eval config") 214 215 task_output: str | None = None 216 reference_answer: str | None = None 217 trace: str | None = None 218 scores: EvalScores | None = None 219 intermediate_outputs: Dict[str, str] | None = None 220 task_run_usage: Usage | None = None 221 if job.type == "eval_config_eval": 222 # Eval config eval, we use the saved input from the task run, not invoking the task again 223 scores, intermediate_outputs = await evaluator.run_eval(job.item) 224 task_output = job.item.output.output 225 task_run_usage = job.item.usage 226 else: 227 # Task run eval, we invoke the task again to get a fresh output 228 ( 229 result_task_run, 230 scores, 231 intermediate_outputs, 232 ) = await evaluator.run_task_and_eval(job.item) 233 task_output = result_task_run.output.output 234 task_run_usage = result_task_run.usage 235 236 parent_eval = job.eval_config.parent_eval() 237 if ( 238 parent_eval 239 and parent_eval.evaluation_data_type == EvalDataType.full_trace 240 and result_task_run.trace 241 ): 242 trace = json.dumps(result_task_run.trace, indent=2) 243 244 if ( 245 parent_eval 246 and parent_eval.evaluation_data_type 247 == EvalDataType.reference_answer 248 ): 249 reference_answer = job.item.output.output 250 251 # Save the job result 252 async with self._save_context(): 253 eval_run = EvalRun( 254 parent=job.eval_config, 255 task_run_config_id=job.task_run_config.id 256 if job.task_run_config 257 else None, 258 dataset_id=job.item.id, 259 eval_config_eval=job.type == "eval_config_eval", 260 scores=scores, 261 input=job.item.input, 262 output=task_output, 263 reference_answer=reference_answer, 264 intermediate_outputs=intermediate_outputs, 265 task_run_trace=trace, 266 task_run_usage=task_run_usage, 267 ) 268 eval_run.save_to_file() 269 270 return True 271 except Exception as e: 272 if _is_retryable_error(e): 273 logger.error( 274 f"Transient error running eval job for dataset item {job.item.id}: {e}", 275 exc_info=True, 276 ) 277 raise RetryableError(str(e)) from e 278 logger.error( 279 f"Error running eval job for dataset item {job.item.id}: {e}", 280 exc_info=True, 281 ) 282 raise