kiln_ai.adapters.eval.eval_runner
1import logging 2from dataclasses import dataclass 3from typing import AsyncGenerator, Dict, List, Literal, Set 4 5from kiln_ai.adapters.eval.base_eval import BaseEval 6from kiln_ai.adapters.eval.registry import eval_adapter_from_type 7from kiln_ai.datamodel.basemodel import ID_TYPE 8from kiln_ai.datamodel.dataset_filters import dataset_filter_from_id 9from kiln_ai.datamodel.eval import EvalConfig, EvalRun, EvalScores 10from kiln_ai.datamodel.task import TaskRunConfig 11from kiln_ai.datamodel.task_run import TaskRun 12from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress 13 14logger = logging.getLogger(__name__) 15 16 17@dataclass 18class EvalJob: 19 item: TaskRun 20 type: Literal["task_run_eval", "eval_config_eval"] 21 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 22 eval_config: EvalConfig 23 task_run_config: TaskRunConfig | None = None 24 25 26class EvalRunner: 27 """ 28 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 29 30 Can run an eval in 2 modes: 31 1) eval_config_eval: evaluate an eval config using existing dataset items. 32 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 33 """ 34 35 def __init__( 36 self, 37 eval_configs: List[EvalConfig], 38 run_configs: List[TaskRunConfig] | None, 39 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 40 ): 41 if len(eval_configs) == 0: 42 raise ValueError("Eval runner requires at least one eval config") 43 target_eval = eval_configs[0].parent_eval() 44 if target_eval is None: 45 raise ValueError("Eval config requires a parent eval") 46 for eval_config in eval_configs: 47 parent_eval = eval_config.parent_eval() 48 if parent_eval is None: 49 raise ValueError("Eval config requires a parent eval") 50 if parent_eval.id != target_eval.id: 51 raise ValueError("All eval configs must have the same parent eval") 52 53 target_task = target_eval.parent_task() 54 if target_task is None: 55 raise ValueError("Eval config requires a (grand)parent task") 56 57 # Check that run_configs is compatible 58 if eval_run_type == "task_run_eval": 59 if run_configs is None or len(run_configs) == 0: 60 raise ValueError("Task run eval requires run configs") 61 for run_config in run_configs: 62 parent_task = run_config.parent_task() 63 if parent_task is None: 64 raise ValueError("All run configs must have a parent task") 65 if parent_task.id != target_task.id: 66 raise ValueError( 67 "Run config is not for the same task as the eval configs" 68 ) 69 else: 70 if run_configs is not None: 71 raise ValueError("Mode 'eval_config_eval' does not support run configs") 72 73 self.eval_run_type = eval_run_type 74 self.eval_configs = eval_configs 75 self.run_configs = run_configs 76 self.task = target_task 77 self.eval = target_eval 78 79 def collect_tasks(self) -> List[EvalJob]: 80 if self.eval_run_type == "eval_config_eval": 81 return self.collect_tasks_for_eval_config_eval() 82 else: 83 return self.collect_tasks_for_task_run_eval() 84 85 def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]: 86 """ 87 Collect all jobs for this run, excluding any that have already been run. 88 89 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 90 91 The tasks: 92 - should be in the eval config set filter 93 - should not have already been run for this eval config + dataset item pair 94 """ 95 filter = dataset_filter_from_id(self.eval.eval_configs_filter_id) 96 97 # already_run[eval_config_id][dataset_id] 98 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 99 for eval_config in self.eval_configs: 100 already_run[eval_config.id] = set() 101 for run in eval_config.runs(readonly=True): 102 already_run[eval_config.id].add(run.dataset_id) 103 104 return [ 105 EvalJob( 106 item=task_run, 107 eval_config=eval_config, 108 type="eval_config_eval", 109 ) 110 for task_run in self.task.runs(readonly=True) 111 if filter(task_run) 112 for eval_config in self.eval_configs 113 if task_run.id not in already_run[eval_config.id] 114 ] 115 116 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 117 """ 118 Collect all jobs for this run, excluding any that have already been run. 119 120 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 121 122 The tasks: 123 - should be in the eval set filter 124 - should not have already been run for this eval config + run config + dataset item 125 """ 126 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 127 128 # already_run[eval_config_id][run_config_id][dataset_id] 129 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 130 for eval_config in self.eval_configs: 131 already_run[eval_config.id] = {} 132 for run_config in self.run_configs or []: 133 already_run[eval_config.id][run_config.id] = set() 134 for run in eval_config.runs(readonly=True): 135 if ( 136 run.task_run_config_id is not None 137 and run.task_run_config_id in already_run[eval_config.id] 138 ): 139 already_run[eval_config.id][run.task_run_config_id].add( 140 run.dataset_id 141 ) 142 143 return [ 144 EvalJob( 145 item=task_run, 146 task_run_config=run_config, 147 type="task_run_eval", 148 eval_config=eval_config, 149 ) 150 for task_run in self.task.runs(readonly=True) 151 if filter(task_run) 152 for eval_config in self.eval_configs 153 for run_config in self.run_configs or [] 154 if task_run.id not in already_run[eval_config.id][run_config.id] 155 ] 156 157 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 158 """ 159 Runs the configured eval run with parallel workers and yields progress updates. 160 """ 161 jobs = self.collect_tasks() 162 163 runner = AsyncJobRunner(concurrency=concurrency) 164 async for progress in runner.run(jobs, self.run_job): 165 yield progress 166 167 async def run_job(self, job: EvalJob) -> bool: 168 try: 169 # Create the evaluator for this eval config/run config pair 170 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 171 job.eval_config, 172 job.task_run_config.run_config() if job.task_run_config else None, 173 ) 174 if not isinstance(evaluator, BaseEval): 175 raise ValueError("Not able to create evaluator from eval config") 176 177 task_output: str | None = None 178 scores: EvalScores | None = None 179 intermediate_outputs: Dict[str, str] | None = None 180 if job.type == "eval_config_eval": 181 # Eval config eval, we use the saved input from the task run, not invoking the task again 182 scores, intermediate_outputs = await evaluator.run_eval(job.item) 183 task_output = job.item.output.output 184 else: 185 # Task run eval, we invoke the task again to get a fresh output 186 ( 187 result_task_run, 188 scores, 189 intermediate_outputs, 190 ) = await evaluator.run_task_and_eval(job.item.input) 191 task_output = result_task_run.output.output 192 193 # Save the job result 194 eval_run = EvalRun( 195 parent=job.eval_config, 196 task_run_config_id=job.task_run_config.id 197 if job.task_run_config 198 else None, 199 dataset_id=job.item.id, 200 eval_config_eval=job.type == "eval_config_eval", 201 scores=scores, 202 input=job.item.input, 203 output=task_output, 204 intermediate_outputs=intermediate_outputs, 205 ) 206 eval_run.save_to_file() 207 208 return True 209 except Exception as e: 210 logger.error(f"Error running eval job for dataset item {job.item.id}: {e}") 211 return False
18@dataclass 19class EvalJob: 20 item: TaskRun 21 type: Literal["task_run_eval", "eval_config_eval"] 22 # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set. 23 eval_config: EvalConfig 24 task_run_config: TaskRunConfig | None = None
27class EvalRunner: 28 """ 29 Runs an eval. Async execution is supported to make it faster when using remote/fast model providers. 30 31 Can run an eval in 2 modes: 32 1) eval_config_eval: evaluate an eval config using existing dataset items. 33 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input. 34 """ 35 36 def __init__( 37 self, 38 eval_configs: List[EvalConfig], 39 run_configs: List[TaskRunConfig] | None, 40 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 41 ): 42 if len(eval_configs) == 0: 43 raise ValueError("Eval runner requires at least one eval config") 44 target_eval = eval_configs[0].parent_eval() 45 if target_eval is None: 46 raise ValueError("Eval config requires a parent eval") 47 for eval_config in eval_configs: 48 parent_eval = eval_config.parent_eval() 49 if parent_eval is None: 50 raise ValueError("Eval config requires a parent eval") 51 if parent_eval.id != target_eval.id: 52 raise ValueError("All eval configs must have the same parent eval") 53 54 target_task = target_eval.parent_task() 55 if target_task is None: 56 raise ValueError("Eval config requires a (grand)parent task") 57 58 # Check that run_configs is compatible 59 if eval_run_type == "task_run_eval": 60 if run_configs is None or len(run_configs) == 0: 61 raise ValueError("Task run eval requires run configs") 62 for run_config in run_configs: 63 parent_task = run_config.parent_task() 64 if parent_task is None: 65 raise ValueError("All run configs must have a parent task") 66 if parent_task.id != target_task.id: 67 raise ValueError( 68 "Run config is not for the same task as the eval configs" 69 ) 70 else: 71 if run_configs is not None: 72 raise ValueError("Mode 'eval_config_eval' does not support run configs") 73 74 self.eval_run_type = eval_run_type 75 self.eval_configs = eval_configs 76 self.run_configs = run_configs 77 self.task = target_task 78 self.eval = target_eval 79 80 def collect_tasks(self) -> List[EvalJob]: 81 if self.eval_run_type == "eval_config_eval": 82 return self.collect_tasks_for_eval_config_eval() 83 else: 84 return self.collect_tasks_for_task_run_eval() 85 86 def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]: 87 """ 88 Collect all jobs for this run, excluding any that have already been run. 89 90 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 91 92 The tasks: 93 - should be in the eval config set filter 94 - should not have already been run for this eval config + dataset item pair 95 """ 96 filter = dataset_filter_from_id(self.eval.eval_configs_filter_id) 97 98 # already_run[eval_config_id][dataset_id] 99 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 100 for eval_config in self.eval_configs: 101 already_run[eval_config.id] = set() 102 for run in eval_config.runs(readonly=True): 103 already_run[eval_config.id].add(run.dataset_id) 104 105 return [ 106 EvalJob( 107 item=task_run, 108 eval_config=eval_config, 109 type="eval_config_eval", 110 ) 111 for task_run in self.task.runs(readonly=True) 112 if filter(task_run) 113 for eval_config in self.eval_configs 114 if task_run.id not in already_run[eval_config.id] 115 ] 116 117 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 118 """ 119 Collect all jobs for this run, excluding any that have already been run. 120 121 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 122 123 The tasks: 124 - should be in the eval set filter 125 - should not have already been run for this eval config + run config + dataset item 126 """ 127 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 128 129 # already_run[eval_config_id][run_config_id][dataset_id] 130 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 131 for eval_config in self.eval_configs: 132 already_run[eval_config.id] = {} 133 for run_config in self.run_configs or []: 134 already_run[eval_config.id][run_config.id] = set() 135 for run in eval_config.runs(readonly=True): 136 if ( 137 run.task_run_config_id is not None 138 and run.task_run_config_id in already_run[eval_config.id] 139 ): 140 already_run[eval_config.id][run.task_run_config_id].add( 141 run.dataset_id 142 ) 143 144 return [ 145 EvalJob( 146 item=task_run, 147 task_run_config=run_config, 148 type="task_run_eval", 149 eval_config=eval_config, 150 ) 151 for task_run in self.task.runs(readonly=True) 152 if filter(task_run) 153 for eval_config in self.eval_configs 154 for run_config in self.run_configs or [] 155 if task_run.id not in already_run[eval_config.id][run_config.id] 156 ] 157 158 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 159 """ 160 Runs the configured eval run with parallel workers and yields progress updates. 161 """ 162 jobs = self.collect_tasks() 163 164 runner = AsyncJobRunner(concurrency=concurrency) 165 async for progress in runner.run(jobs, self.run_job): 166 yield progress 167 168 async def run_job(self, job: EvalJob) -> bool: 169 try: 170 # Create the evaluator for this eval config/run config pair 171 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 172 job.eval_config, 173 job.task_run_config.run_config() if job.task_run_config else None, 174 ) 175 if not isinstance(evaluator, BaseEval): 176 raise ValueError("Not able to create evaluator from eval config") 177 178 task_output: str | None = None 179 scores: EvalScores | None = None 180 intermediate_outputs: Dict[str, str] | None = None 181 if job.type == "eval_config_eval": 182 # Eval config eval, we use the saved input from the task run, not invoking the task again 183 scores, intermediate_outputs = await evaluator.run_eval(job.item) 184 task_output = job.item.output.output 185 else: 186 # Task run eval, we invoke the task again to get a fresh output 187 ( 188 result_task_run, 189 scores, 190 intermediate_outputs, 191 ) = await evaluator.run_task_and_eval(job.item.input) 192 task_output = result_task_run.output.output 193 194 # Save the job result 195 eval_run = EvalRun( 196 parent=job.eval_config, 197 task_run_config_id=job.task_run_config.id 198 if job.task_run_config 199 else None, 200 dataset_id=job.item.id, 201 eval_config_eval=job.type == "eval_config_eval", 202 scores=scores, 203 input=job.item.input, 204 output=task_output, 205 intermediate_outputs=intermediate_outputs, 206 ) 207 eval_run.save_to_file() 208 209 return True 210 except Exception as e: 211 logger.error(f"Error running eval job for dataset item {job.item.id}: {e}") 212 return False
Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
36 def __init__( 37 self, 38 eval_configs: List[EvalConfig], 39 run_configs: List[TaskRunConfig] | None, 40 eval_run_type: Literal["eval_config_eval", "task_run_eval"], 41 ): 42 if len(eval_configs) == 0: 43 raise ValueError("Eval runner requires at least one eval config") 44 target_eval = eval_configs[0].parent_eval() 45 if target_eval is None: 46 raise ValueError("Eval config requires a parent eval") 47 for eval_config in eval_configs: 48 parent_eval = eval_config.parent_eval() 49 if parent_eval is None: 50 raise ValueError("Eval config requires a parent eval") 51 if parent_eval.id != target_eval.id: 52 raise ValueError("All eval configs must have the same parent eval") 53 54 target_task = target_eval.parent_task() 55 if target_task is None: 56 raise ValueError("Eval config requires a (grand)parent task") 57 58 # Check that run_configs is compatible 59 if eval_run_type == "task_run_eval": 60 if run_configs is None or len(run_configs) == 0: 61 raise ValueError("Task run eval requires run configs") 62 for run_config in run_configs: 63 parent_task = run_config.parent_task() 64 if parent_task is None: 65 raise ValueError("All run configs must have a parent task") 66 if parent_task.id != target_task.id: 67 raise ValueError( 68 "Run config is not for the same task as the eval configs" 69 ) 70 else: 71 if run_configs is not None: 72 raise ValueError("Mode 'eval_config_eval' does not support run configs") 73 74 self.eval_run_type = eval_run_type 75 self.eval_configs = eval_configs 76 self.run_configs = run_configs 77 self.task = target_task 78 self.eval = target_eval
86 def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]: 87 """ 88 Collect all jobs for this run, excluding any that have already been run. 89 90 This variant is used for mode "eval_config_eval", using existing dataset run data (input/output). 91 92 The tasks: 93 - should be in the eval config set filter 94 - should not have already been run for this eval config + dataset item pair 95 """ 96 filter = dataset_filter_from_id(self.eval.eval_configs_filter_id) 97 98 # already_run[eval_config_id][dataset_id] 99 already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {} 100 for eval_config in self.eval_configs: 101 already_run[eval_config.id] = set() 102 for run in eval_config.runs(readonly=True): 103 already_run[eval_config.id].add(run.dataset_id) 104 105 return [ 106 EvalJob( 107 item=task_run, 108 eval_config=eval_config, 109 type="eval_config_eval", 110 ) 111 for task_run in self.task.runs(readonly=True) 112 if filter(task_run) 113 for eval_config in self.eval_configs 114 if task_run.id not in already_run[eval_config.id] 115 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
The tasks:
- should be in the eval config set filter
- should not have already been run for this eval config + dataset item pair
117 def collect_tasks_for_task_run_eval(self) -> List[EvalJob]: 118 """ 119 Collect all jobs for this run, excluding any that have already been run. 120 121 This variant is used for mode "task_run_eval", generating new run output using existing dataset item input. 122 123 The tasks: 124 - should be in the eval set filter 125 - should not have already been run for this eval config + run config + dataset item 126 """ 127 filter = dataset_filter_from_id(self.eval.eval_set_filter_id) 128 129 # already_run[eval_config_id][run_config_id][dataset_id] 130 already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {} 131 for eval_config in self.eval_configs: 132 already_run[eval_config.id] = {} 133 for run_config in self.run_configs or []: 134 already_run[eval_config.id][run_config.id] = set() 135 for run in eval_config.runs(readonly=True): 136 if ( 137 run.task_run_config_id is not None 138 and run.task_run_config_id in already_run[eval_config.id] 139 ): 140 already_run[eval_config.id][run.task_run_config_id].add( 141 run.dataset_id 142 ) 143 144 return [ 145 EvalJob( 146 item=task_run, 147 task_run_config=run_config, 148 type="task_run_eval", 149 eval_config=eval_config, 150 ) 151 for task_run in self.task.runs(readonly=True) 152 if filter(task_run) 153 for eval_config in self.eval_configs 154 for run_config in self.run_configs or [] 155 if task_run.id not in already_run[eval_config.id][run_config.id] 156 ]
Collect all jobs for this run, excluding any that have already been run.
This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
The tasks:
- should be in the eval set filter
- should not have already been run for this eval config + run config + dataset item
158 async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]: 159 """ 160 Runs the configured eval run with parallel workers and yields progress updates. 161 """ 162 jobs = self.collect_tasks() 163 164 runner = AsyncJobRunner(concurrency=concurrency) 165 async for progress in runner.run(jobs, self.run_job): 166 yield progress
Runs the configured eval run with parallel workers and yields progress updates.
168 async def run_job(self, job: EvalJob) -> bool: 169 try: 170 # Create the evaluator for this eval config/run config pair 171 evaluator = eval_adapter_from_type(job.eval_config.config_type)( 172 job.eval_config, 173 job.task_run_config.run_config() if job.task_run_config else None, 174 ) 175 if not isinstance(evaluator, BaseEval): 176 raise ValueError("Not able to create evaluator from eval config") 177 178 task_output: str | None = None 179 scores: EvalScores | None = None 180 intermediate_outputs: Dict[str, str] | None = None 181 if job.type == "eval_config_eval": 182 # Eval config eval, we use the saved input from the task run, not invoking the task again 183 scores, intermediate_outputs = await evaluator.run_eval(job.item) 184 task_output = job.item.output.output 185 else: 186 # Task run eval, we invoke the task again to get a fresh output 187 ( 188 result_task_run, 189 scores, 190 intermediate_outputs, 191 ) = await evaluator.run_task_and_eval(job.item.input) 192 task_output = result_task_run.output.output 193 194 # Save the job result 195 eval_run = EvalRun( 196 parent=job.eval_config, 197 task_run_config_id=job.task_run_config.id 198 if job.task_run_config 199 else None, 200 dataset_id=job.item.id, 201 eval_config_eval=job.type == "eval_config_eval", 202 scores=scores, 203 input=job.item.input, 204 output=task_output, 205 intermediate_outputs=intermediate_outputs, 206 ) 207 eval_run.save_to_file() 208 209 return True 210 except Exception as e: 211 logger.error(f"Error running eval job for dataset item {job.item.id}: {e}") 212 return False