kiln_ai.adapters.eval.eval_runner
1import json 2import logging 3from dataclasses import dataclass 4from typing import AsyncGenerator, Dict, List, Literal, Set 5 6from kiln_ai.adapters.eval.base_eval import BaseEval 7from kiln_ai.adapters.eval.registry import eval_adapter_from_type 8from kiln_ai.datamodel.basemodel import ID_TYPE 9from kiln_ai.datamodel.dataset_filters import DatasetFilterId, dataset_filter_from_id 10from kiln_ai.datamodel.eval import EvalConfig, EvalDataType, EvalRun, EvalScores 11from kiln_ai.datamodel.task import TaskRunConfig 12from kiln_ai.datamodel.task_run import TaskRun, Usage 13from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress 14 15logger = logging.getLogger(__name__) 16 17 18@dataclass 19class EvalJob: 20 item: TaskRun 21 type: Literal["task_run_eval", "eval_config_eval"] 22 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 23 eval_config: EvalConfig 24 task_run_config: TaskRunConfig | None = None 25 26 27class EvalRunner: 28 """ 29 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 30 31 Can run an eval in 2 modes: 32 1) eval_config_eval: evaluate an eval config using existing dataset items. 33 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 34 """ 35 36 def __init__( 37 self, 38 eval_configs: List[EvalConfig], 39 run_configs: List[TaskRunConfig] | None, 40 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 41 ): 42 if len(eval_configs) == 0: 43 raise ValueError("Eval runner requires at least one eval config") 44 target_eval = eval_configs[0].parent_eval() 45 if target_eval is None: 46 raise ValueError("Eval config requires a parent eval") 47 for eval_config in eval_configs: 48 parent_eval = eval_config.parent_eval() 49 if parent_eval is None: 50 raise ValueError("Eval config requires a parent eval") 51 if parent_eval.id != target_eval.id: 52 raise ValueError("All eval configs must have the same parent eval") 53 54 target_task = target_eval.parent_task() 55 if target_task is None: 56 raise ValueError("Eval config requires a (grand)parent task") 57 58 # Check that run_configs is compatible 59 if eval_run_type == "task_run_eval": 60 if run_configs is None or len(run_configs) == 0: 61 raise ValueError("Task run eval requires run configs") 62 for run_config in run_configs: 63 parent_task = run_config.parent_task() 64 if parent_task is None: 65 raise ValueError("All run configs must have a parent task") 66 if parent_task.id != target_task.id: 67 raise ValueError( 68 "Run config is not for the same task as the eval configs" 69 ) 70 else: 71 if run_configs is not None: 72 raise ValueError("Mode 'eval_config_eval' does not support run configs") 73 74 self.eval_run_type = eval_run_type 75 self.eval_configs = eval_configs 76 self.run_configs = run_configs 77 self.task = target_task 78 self.eval = target_eval 79 80 def collect_tasks(self) -> List[EvalJob]: 81 if self.eval_run_type == "eval_config_eval": 82 if self.eval.eval_configs_filter_id is not None: 83 return self.collect_tasks_for_eval_config_eval( 84 self.eval.eval_configs_filter_id 85 ) 86 else: 87 raise ValueError( 88 "Eval configs filter ID is required for eval runs of type 'eval_config_eval'" 89 ) 90 91 else: 92 return self.collect_tasks_for_task_run_eval() 93 94 def collect_tasks_for_eval_config_eval( 95 self, eval_configs_filter_id: DatasetFilterId 96 ) -> List[EvalJob]: 97 """ 98 Collect all jobs for this run, excluding any that have already been run. 99 100 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 101 102 The tasks: 103 - should be in the eval config set filter 104 - should not have already been run for this eval config + dataset item pair 105 """ 106 filter = dataset_filter_from_id(eval_configs_filter_id) 107 108 # already_run[eval_config_id][dataset_id] 109 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 110 for eval_config in self.eval_configs: 111 already_run[eval_config.id] = set() 112 for run in eval_config.runs(readonly=True): 113 already_run[eval_config.id].add(run.dataset_id) 114 115 return [ 116 EvalJob( 117 item=task_run, 118 eval_config=eval_config, 119 type="eval_config_eval", 120 ) 121 for task_run in self.task.runs(readonly=True) 122 if filter(task_run) 123 for eval_config in self.eval_configs 124 if task_run.id not in already_run[eval_config.id] 125 ] 126 127 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 128 """ 129 Collect all jobs for this run, excluding any that have already been run. 130 131 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 132 133 The tasks: 134 - should be in the eval set filter 135 - should not have already been run for this eval config + run config + dataset item 136 """ 137 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 138 139 # already_run[eval_config_id][run_config_id][dataset_id] 140 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 141 for eval_config in self.eval_configs: 142 already_run[eval_config.id] = {} 143 for run_config in self.run_configs or []: 144 already_run[eval_config.id][run_config.id] = set() 145 for run in eval_config.runs(readonly=True): 146 if ( 147 run.task_run_config_id is not None 148 and run.task_run_config_id in already_run[eval_config.id] 149 ): 150 already_run[eval_config.id][run.task_run_config_id].add( 151 run.dataset_id 152 ) 153 154 return [ 155 EvalJob( 156 item=task_run, 157 task_run_config=run_config, 158 type="task_run_eval", 159 eval_config=eval_config, 160 ) 161 for task_run in self.task.runs(readonly=True) 162 if filter(task_run) 163 for eval_config in self.eval_configs 164 for run_config in self.run_configs or [] 165 if task_run.id not in already_run[eval_config.id][run_config.id] 166 ] 167 168 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 169 """ 170 Runs the configured eval run with parallel workers and yields progress updates. 171 """ 172 jobs = self.collect_tasks() 173 174 runner = AsyncJobRunner( 175 concurrency=concurrency, 176 jobs=jobs, 177 run_job_fn=self.run_job, 178 ) 179 async for progress in runner.run(): 180 yield progress 181 182 async def run_job(self, job: EvalJob) -> bool: 183 try: 184 # Create the evaluator for this eval config/run config pair 185 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 186 job.eval_config, 187 job.task_run_config.run_config_properties 188 if job.task_run_config 189 else None, 190 ) 191 if not isinstance(evaluator, BaseEval): 192 raise ValueError("Not able to create evaluator from eval config") 193 194 task_output: str | None = None 195 reference_answer: str | None = None 196 trace: str | None = None 197 scores: EvalScores | None = None 198 intermediate_outputs: Dict[str, str] | None = None 199 task_run_usage: Usage | None = None 200 if job.type == "eval_config_eval": 201 # Eval config eval, we use the saved input from the task run, not invoking the task again 202 scores, intermediate_outputs = await evaluator.run_eval(job.item) 203 task_output = job.item.output.output 204 task_run_usage = job.item.usage 205 else: 206 # Task run eval, we invoke the task again to get a fresh output 207 ( 208 result_task_run, 209 scores, 210 intermediate_outputs, 211 ) = await evaluator.run_task_and_eval(job.item) 212 task_output = result_task_run.output.output 213 task_run_usage = result_task_run.usage 214 215 parent_eval = job.eval_config.parent_eval() 216 if ( 217 parent_eval 218 and parent_eval.evaluation_data_type == EvalDataType.full_trace 219 and result_task_run.trace 220 ): 221 trace = json.dumps(result_task_run.trace, indent=2) 222 223 if ( 224 parent_eval 225 and parent_eval.evaluation_data_type 226 == EvalDataType.reference_answer 227 ): 228 reference_answer = job.item.output.output 229 230 # Save the job result 231 eval_run = EvalRun( 232 parent=job.eval_config, 233 task_run_config_id=job.task_run_config.id 234 if job.task_run_config 235 else None, 236 dataset_id=job.item.id, 237 eval_config_eval=job.type == "eval_config_eval", 238 scores=scores, 239 input=job.item.input, 240 output=task_output, 241 reference_answer=reference_answer, 242 intermediate_outputs=intermediate_outputs, 243 task_run_trace=trace, 244 task_run_usage=task_run_usage, 245 ) 246 eval_run.save_to_file() 247 248 return True 249 except Exception as e: 250 logger.error( 251 f"Error running eval job for dataset item {job.item.id}: {e}", 252 exc_info=True, 253 ) 254 return False
19@dataclass 20class EvalJob: 21 item: TaskRun 22 type: Literal["task_run_eval", "eval_config_eval"] 23 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 24 eval_config: EvalConfig 25 task_run_config: TaskRunConfig | None = None
28class EvalRunner: 29 """ 30 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 31 32 Can run an eval in 2 modes: 33 1) eval_config_eval: evaluate an eval config using existing dataset items. 34 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 35 """ 36 37 def __init__( 38 self, 39 eval_configs: List[EvalConfig], 40 run_configs: List[TaskRunConfig] | None, 41 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 42 ): 43 if len(eval_configs) == 0: 44 raise ValueError("Eval runner requires at least one eval config") 45 target_eval = eval_configs[0].parent_eval() 46 if target_eval is None: 47 raise ValueError("Eval config requires a parent eval") 48 for eval_config in eval_configs: 49 parent_eval = eval_config.parent_eval() 50 if parent_eval is None: 51 raise ValueError("Eval config requires a parent eval") 52 if parent_eval.id != target_eval.id: 53 raise ValueError("All eval configs must have the same parent eval") 54 55 target_task = target_eval.parent_task() 56 if target_task is None: 57 raise ValueError("Eval config requires a (grand)parent task") 58 59 # Check that run_configs is compatible 60 if eval_run_type == "task_run_eval": 61 if run_configs is None or len(run_configs) == 0: 62 raise ValueError("Task run eval requires run configs") 63 for run_config in run_configs: 64 parent_task = run_config.parent_task() 65 if parent_task is None: 66 raise ValueError("All run configs must have a parent task") 67 if parent_task.id != target_task.id: 68 raise ValueError( 69 "Run config is not for the same task as the eval configs" 70 ) 71 else: 72 if run_configs is not None: 73 raise ValueError("Mode 'eval_config_eval' does not support run configs") 74 75 self.eval_run_type = eval_run_type 76 self.eval_configs = eval_configs 77 self.run_configs = run_configs 78 self.task = target_task 79 self.eval = target_eval 80 81 def collect_tasks(self) -> List[EvalJob]: 82 if self.eval_run_type == "eval_config_eval": 83 if self.eval.eval_configs_filter_id is not None: 84 return self.collect_tasks_for_eval_config_eval( 85 self.eval.eval_configs_filter_id 86 ) 87 else: 88 raise ValueError( 89 "Eval configs filter ID is required for eval runs of type 'eval_config_eval'" 90 ) 91 92 else: 93 return self.collect_tasks_for_task_run_eval() 94 95 def collect_tasks_for_eval_config_eval( 96 self, eval_configs_filter_id: DatasetFilterId 97 ) -> List[EvalJob]: 98 """ 99 Collect all jobs for this run, excluding any that have already been run. 100 101 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 102 103 The tasks: 104 - should be in the eval config set filter 105 - should not have already been run for this eval config + dataset item pair 106 """ 107 filter = dataset_filter_from_id(eval_configs_filter_id) 108 109 # already_run[eval_config_id][dataset_id] 110 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 111 for eval_config in self.eval_configs: 112 already_run[eval_config.id] = set() 113 for run in eval_config.runs(readonly=True): 114 already_run[eval_config.id].add(run.dataset_id) 115 116 return [ 117 EvalJob( 118 item=task_run, 119 eval_config=eval_config, 120 type="eval_config_eval", 121 ) 122 for task_run in self.task.runs(readonly=True) 123 if filter(task_run) 124 for eval_config in self.eval_configs 125 if task_run.id not in already_run[eval_config.id] 126 ] 127 128 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 129 """ 130 Collect all jobs for this run, excluding any that have already been run. 131 132 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 133 134 The tasks: 135 - should be in the eval set filter 136 - should not have already been run for this eval config + run config + dataset item 137 """ 138 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 139 140 # already_run[eval_config_id][run_config_id][dataset_id] 141 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 142 for eval_config in self.eval_configs: 143 already_run[eval_config.id] = {} 144 for run_config in self.run_configs or []: 145 already_run[eval_config.id][run_config.id] = set() 146 for run in eval_config.runs(readonly=True): 147 if ( 148 run.task_run_config_id is not None 149 and run.task_run_config_id in already_run[eval_config.id] 150 ): 151 already_run[eval_config.id][run.task_run_config_id].add( 152 run.dataset_id 153 ) 154 155 return [ 156 EvalJob( 157 item=task_run, 158 task_run_config=run_config, 159 type="task_run_eval", 160 eval_config=eval_config, 161 ) 162 for task_run in self.task.runs(readonly=True) 163 if filter(task_run) 164 for eval_config in self.eval_configs 165 for run_config in self.run_configs or [] 166 if task_run.id not in already_run[eval_config.id][run_config.id] 167 ] 168 169 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 170 """ 171 Runs the configured eval run with parallel workers and yields progress updates. 172 """ 173 jobs = self.collect_tasks() 174 175 runner = AsyncJobRunner( 176 concurrency=concurrency, 177 jobs=jobs, 178 run_job_fn=self.run_job, 179 ) 180 async for progress in runner.run(): 181 yield progress 182 183 async def run_job(self, job: EvalJob) -> bool: 184 try: 185 # Create the evaluator for this eval config/run config pair 186 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 187 job.eval_config, 188 job.task_run_config.run_config_properties 189 if job.task_run_config 190 else None, 191 ) 192 if not isinstance(evaluator, BaseEval): 193 raise ValueError("Not able to create evaluator from eval config") 194 195 task_output: str | None = None 196 reference_answer: str | None = None 197 trace: str | None = None 198 scores: EvalScores | None = None 199 intermediate_outputs: Dict[str, str] | None = None 200 task_run_usage: Usage | None = None 201 if job.type == "eval_config_eval": 202 # Eval config eval, we use the saved input from the task run, not invoking the task again 203 scores, intermediate_outputs = await evaluator.run_eval(job.item) 204 task_output = job.item.output.output 205 task_run_usage = job.item.usage 206 else: 207 # Task run eval, we invoke the task again to get a fresh output 208 ( 209 result_task_run, 210 scores, 211 intermediate_outputs, 212 ) = await evaluator.run_task_and_eval(job.item) 213 task_output = result_task_run.output.output 214 task_run_usage = result_task_run.usage 215 216 parent_eval = job.eval_config.parent_eval() 217 if ( 218 parent_eval 219 and parent_eval.evaluation_data_type == EvalDataType.full_trace 220 and result_task_run.trace 221 ): 222 trace = json.dumps(result_task_run.trace, indent=2) 223 224 if ( 225 parent_eval 226 and parent_eval.evaluation_data_type 227 == EvalDataType.reference_answer 228 ): 229 reference_answer = job.item.output.output 230 231 # Save the job result 232 eval_run = EvalRun( 233 parent=job.eval_config, 234 task_run_config_id=job.task_run_config.id 235 if job.task_run_config 236 else None, 237 dataset_id=job.item.id, 238 eval_config_eval=job.type == "eval_config_eval", 239 scores=scores, 240 input=job.item.input, 241 output=task_output, 242 reference_answer=reference_answer, 243 intermediate_outputs=intermediate_outputs, 244 task_run_trace=trace, 245 task_run_usage=task_run_usage, 246 ) 247 eval_run.save_to_file() 248 249 return True 250 except Exception as e: 251 logger.error( 252 f"Error running eval job for dataset item {job.item.id}: {e}", 253 exc_info=True, 254 ) 255 return False
Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
37 def __init__( 38 self, 39 eval_configs: List[EvalConfig], 40 run_configs: List[TaskRunConfig] | None, 41 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 42 ): 43 if len(eval_configs) == 0: 44 raise ValueError("Eval runner requires at least one eval config") 45 target_eval = eval_configs[0].parent_eval() 46 if target_eval is None: 47 raise ValueError("Eval config requires a parent eval") 48 for eval_config in eval_configs: 49 parent_eval = eval_config.parent_eval() 50 if parent_eval is None: 51 raise ValueError("Eval config requires a parent eval") 52 if parent_eval.id != target_eval.id: 53 raise ValueError("All eval configs must have the same parent eval") 54 55 target_task = target_eval.parent_task() 56 if target_task is None: 57 raise ValueError("Eval config requires a (grand)parent task") 58 59 # Check that run_configs is compatible 60 if eval_run_type == "task_run_eval": 61 if run_configs is None or len(run_configs) == 0: 62 raise ValueError("Task run eval requires run configs") 63 for run_config in run_configs: 64 parent_task = run_config.parent_task() 65 if parent_task is None: 66 raise ValueError("All run configs must have a parent task") 67 if parent_task.id != target_task.id: 68 raise ValueError( 69 "Run config is not for the same task as the eval configs" 70 ) 71 else: 72 if run_configs is not None: 73 raise ValueError("Mode 'eval_config_eval' does not support run configs") 74 75 self.eval_run_type = eval_run_type 76 self.eval_configs = eval_configs 77 self.run_configs = run_configs 78 self.task = target_task 79 self.eval = target_eval
81 def collect_tasks(self) -> List[EvalJob]: 82 if self.eval_run_type == "eval_config_eval": 83 if self.eval.eval_configs_filter_id is not None: 84 return self.collect_tasks_for_eval_config_eval( 85 self.eval.eval_configs_filter_id 86 ) 87 else: 88 raise ValueError( 89 "Eval configs filter ID is required for eval runs of type 'eval_config_eval'" 90 ) 91 92 else: 93 return self.collect_tasks_for_task_run_eval()
95 def collect_tasks_for_eval_config_eval( 96 self, eval_configs_filter_id: DatasetFilterId 97 ) -> List[EvalJob]: 98 """ 99 Collect all jobs for this run, excluding any that have already been run. 100 101 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 102 103 The tasks: 104 - should be in the eval config set filter 105 - should not have already been run for this eval config + dataset item pair 106 """ 107 filter = dataset_filter_from_id(eval_configs_filter_id) 108 109 # already_run[eval_config_id][dataset_id] 110 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 111 for eval_config in self.eval_configs: 112 already_run[eval_config.id] = set() 113 for run in eval_config.runs(readonly=True): 114 already_run[eval_config.id].add(run.dataset_id) 115 116 return [ 117 EvalJob( 118 item=task_run, 119 eval_config=eval_config, 120 type="eval_config_eval", 121 ) 122 for task_run in self.task.runs(readonly=True) 123 if filter(task_run) 124 for eval_config in self.eval_configs 125 if task_run.id not in already_run[eval_config.id] 126 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
The tasks:
- should be in the eval config set filter
- should not have already been run for this eval config + dataset item pair
128 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 129 """ 130 Collect all jobs for this run, excluding any that have already been run. 131 132 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 133 134 The tasks: 135 - should be in the eval set filter 136 - should not have already been run for this eval config + run config + dataset item 137 """ 138 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 139 140 # already_run[eval_config_id][run_config_id][dataset_id] 141 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 142 for eval_config in self.eval_configs: 143 already_run[eval_config.id] = {} 144 for run_config in self.run_configs or []: 145 already_run[eval_config.id][run_config.id] = set() 146 for run in eval_config.runs(readonly=True): 147 if ( 148 run.task_run_config_id is not None 149 and run.task_run_config_id in already_run[eval_config.id] 150 ): 151 already_run[eval_config.id][run.task_run_config_id].add( 152 run.dataset_id 153 ) 154 155 return [ 156 EvalJob( 157 item=task_run, 158 task_run_config=run_config, 159 type="task_run_eval", 160 eval_config=eval_config, 161 ) 162 for task_run in self.task.runs(readonly=True) 163 if filter(task_run) 164 for eval_config in self.eval_configs 165 for run_config in self.run_configs or [] 166 if task_run.id not in already_run[eval_config.id][run_config.id] 167 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
The tasks:
- should be in the eval set filter
- should not have already been run for this eval config + run config + dataset item
169 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 170 """ 171 Runs the configured eval run with parallel workers and yields progress updates. 172 """ 173 jobs = self.collect_tasks() 174 175 runner = AsyncJobRunner( 176 concurrency=concurrency, 177 jobs=jobs, 178 run_job_fn=self.run_job, 179 ) 180 async for progress in runner.run(): 181 yield progress
Runs the configured eval run with parallel workers and yields progress updates.
183 async def run_job(self, job: EvalJob) -> bool: 184 try: 185 # Create the evaluator for this eval config/run config pair 186 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 187 job.eval_config, 188 job.task_run_config.run_config_properties 189 if job.task_run_config 190 else None, 191 ) 192 if not isinstance(evaluator, BaseEval): 193 raise ValueError("Not able to create evaluator from eval config") 194 195 task_output: str | None = None 196 reference_answer: str | None = None 197 trace: str | None = None 198 scores: EvalScores | None = None 199 intermediate_outputs: Dict[str, str] | None = None 200 task_run_usage: Usage | None = None 201 if job.type == "eval_config_eval": 202 # Eval config eval, we use the saved input from the task run, not invoking the task again 203 scores, intermediate_outputs = await evaluator.run_eval(job.item) 204 task_output = job.item.output.output 205 task_run_usage = job.item.usage 206 else: 207 # Task run eval, we invoke the task again to get a fresh output 208 ( 209 result_task_run, 210 scores, 211 intermediate_outputs, 212 ) = await evaluator.run_task_and_eval(job.item) 213 task_output = result_task_run.output.output 214 task_run_usage = result_task_run.usage 215 216 parent_eval = job.eval_config.parent_eval() 217 if ( 218 parent_eval 219 and parent_eval.evaluation_data_type == EvalDataType.full_trace 220 and result_task_run.trace 221 ): 222 trace = json.dumps(result_task_run.trace, indent=2) 223 224 if ( 225 parent_eval 226 and parent_eval.evaluation_data_type 227 == EvalDataType.reference_answer 228 ): 229 reference_answer = job.item.output.output 230 231 # Save the job result 232 eval_run = EvalRun( 233 parent=job.eval_config, 234 task_run_config_id=job.task_run_config.id 235 if job.task_run_config 236 else None, 237 dataset_id=job.item.id, 238 eval_config_eval=job.type == "eval_config_eval", 239 scores=scores, 240 input=job.item.input, 241 output=task_output, 242 reference_answer=reference_answer, 243 intermediate_outputs=intermediate_outputs, 244 task_run_trace=trace, 245 task_run_usage=task_run_usage, 246 ) 247 eval_run.save_to_file() 248 249 return True 250 except Exception as e: 251 logger.error( 252 f"Error running eval job for dataset item {job.item.id}: {e}", 253 exc_info=True, 254 ) 255 return False