kiln_ai.adapters.eval.eval_runner

  1import logging
  2from dataclasses import dataclass
  3from typing import AsyncGenerator, Dict, List, Literal, Set
  4
  5from kiln_ai.adapters.eval.base_eval import BaseEval
  6from kiln_ai.adapters.eval.registry import eval_adapter_from_type
  7from kiln_ai.datamodel.basemodel import ID_TYPE
  8from kiln_ai.datamodel.dataset_filters import dataset_filter_from_id
  9from kiln_ai.datamodel.eval import EvalConfig, EvalRun, EvalScores
 10from kiln_ai.datamodel.task import TaskRunConfig
 11from kiln_ai.datamodel.task_run import TaskRun, Usage
 12from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress
 13
 14logger = logging.getLogger(__name__)
 15
 16
 17@dataclass
 18class EvalJob:
 19    item: TaskRun
 20    type: Literal["task_run_eval", "eval_config_eval"]
 21    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
 22    eval_config: EvalConfig
 23    task_run_config: TaskRunConfig | None = None
 24
 25
 26class EvalRunner:
 27    """
 28    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 29
 30    Can run an eval in 2 modes:
 31    1) eval_config_eval: evaluate an eval config using existing dataset items.
 32    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 33    """
 34
 35    def __init__(
 36        self,
 37        eval_configs: List[EvalConfig],
 38        run_configs: List[TaskRunConfig] | None,
 39        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 40    ):
 41        if len(eval_configs) == 0:
 42            raise ValueError("Eval runner requires at least one eval config")
 43        target_eval = eval_configs[0].parent_eval()
 44        if target_eval is None:
 45            raise ValueError("Eval config requires a parent eval")
 46        for eval_config in eval_configs:
 47            parent_eval = eval_config.parent_eval()
 48            if parent_eval is None:
 49                raise ValueError("Eval config requires a parent eval")
 50            if parent_eval.id != target_eval.id:
 51                raise ValueError("All eval configs must have the same parent eval")
 52
 53        target_task = target_eval.parent_task()
 54        if target_task is None:
 55            raise ValueError("Eval config requires a (grand)parent task")
 56
 57        # Check that run_configs is compatible
 58        if eval_run_type == "task_run_eval":
 59            if run_configs is None or len(run_configs) == 0:
 60                raise ValueError("Task run eval requires run configs")
 61            for run_config in run_configs:
 62                parent_task = run_config.parent_task()
 63                if parent_task is None:
 64                    raise ValueError("All run configs must have a parent task")
 65                if parent_task.id != target_task.id:
 66                    raise ValueError(
 67                        "Run config is not for the same task as the eval configs"
 68                    )
 69        else:
 70            if run_configs is not None:
 71                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 72
 73        self.eval_run_type = eval_run_type
 74        self.eval_configs = eval_configs
 75        self.run_configs = run_configs
 76        self.task = target_task
 77        self.eval = target_eval
 78
 79    def collect_tasks(self) -> List[EvalJob]:
 80        if self.eval_run_type == "eval_config_eval":
 81            return self.collect_tasks_for_eval_config_eval()
 82        else:
 83            return self.collect_tasks_for_task_run_eval()
 84
 85    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 86        """
 87        Collect all jobs for this run, excluding any that have already been run.
 88
 89        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 90
 91        The tasks:
 92        - should be in the eval config set filter
 93        - should not have already been run for this eval config + dataset item pair
 94        """
 95        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
 96
 97        # already_run[eval_config_id][dataset_id]
 98        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
 99        for eval_config in self.eval_configs:
100            already_run[eval_config.id] = set()
101            for run in eval_config.runs(readonly=True):
102                already_run[eval_config.id].add(run.dataset_id)
103
104        return [
105            EvalJob(
106                item=task_run,
107                eval_config=eval_config,
108                type="eval_config_eval",
109            )
110            for task_run in self.task.runs(readonly=True)
111            if filter(task_run)
112            for eval_config in self.eval_configs
113            if task_run.id not in already_run[eval_config.id]
114        ]
115
116    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
117        """
118        Collect all jobs for this run, excluding any that have already been run.
119
120        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
121
122        The tasks:
123        - should be in the eval set filter
124        - should not have already been run for this eval config + run config + dataset item
125        """
126        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
127
128        # already_run[eval_config_id][run_config_id][dataset_id]
129        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
130        for eval_config in self.eval_configs:
131            already_run[eval_config.id] = {}
132            for run_config in self.run_configs or []:
133                already_run[eval_config.id][run_config.id] = set()
134            for run in eval_config.runs(readonly=True):
135                if (
136                    run.task_run_config_id is not None
137                    and run.task_run_config_id in already_run[eval_config.id]
138                ):
139                    already_run[eval_config.id][run.task_run_config_id].add(
140                        run.dataset_id
141                    )
142
143        return [
144            EvalJob(
145                item=task_run,
146                task_run_config=run_config,
147                type="task_run_eval",
148                eval_config=eval_config,
149            )
150            for task_run in self.task.runs(readonly=True)
151            if filter(task_run)
152            for eval_config in self.eval_configs
153            for run_config in self.run_configs or []
154            if task_run.id not in already_run[eval_config.id][run_config.id]
155        ]
156
157    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
158        """
159        Runs the configured eval run with parallel workers and yields progress updates.
160        """
161        jobs = self.collect_tasks()
162
163        runner = AsyncJobRunner(
164            concurrency=concurrency,
165            jobs=jobs,
166            run_job_fn=self.run_job,
167        )
168        async for progress in runner.run():
169            yield progress
170
171    async def run_job(self, job: EvalJob) -> bool:
172        try:
173            # Create the evaluator for this eval config/run config pair
174            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
175                job.eval_config,
176                job.task_run_config.run_config_properties
177                if job.task_run_config
178                else None,
179            )
180            if not isinstance(evaluator, BaseEval):
181                raise ValueError("Not able to create evaluator from eval config")
182
183            task_output: str | None = None
184            scores: EvalScores | None = None
185            intermediate_outputs: Dict[str, str] | None = None
186            task_run_usage: Usage | None = None
187            if job.type == "eval_config_eval":
188                # Eval config eval, we use the saved input from the task run, not invoking the task again
189                scores, intermediate_outputs = await evaluator.run_eval(job.item)
190                task_output = job.item.output.output
191                task_run_usage = job.item.usage
192            else:
193                # Task run eval, we invoke the task again to get a fresh output
194                (
195                    result_task_run,
196                    scores,
197                    intermediate_outputs,
198                ) = await evaluator.run_task_and_eval(job.item.input)
199                task_output = result_task_run.output.output
200                task_run_usage = result_task_run.usage
201
202            # Save the job result
203            eval_run = EvalRun(
204                parent=job.eval_config,
205                task_run_config_id=job.task_run_config.id
206                if job.task_run_config
207                else None,
208                dataset_id=job.item.id,
209                eval_config_eval=job.type == "eval_config_eval",
210                scores=scores,
211                input=job.item.input,
212                output=task_output,
213                intermediate_outputs=intermediate_outputs,
214                task_run_usage=task_run_usage,
215            )
216            eval_run.save_to_file()
217
218            return True
219        except Exception as e:
220            logger.error(
221                f"Error running eval job for dataset item {job.item.id}: {e}",
222                exc_info=True,
223            )
224            return False
logger = <Logger kiln_ai.adapters.eval.eval_runner (WARNING)>
@dataclass
class EvalJob:
18@dataclass
19class EvalJob:
20    item: TaskRun
21    type: Literal["task_run_eval", "eval_config_eval"]
22    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
23    eval_config: EvalConfig
24    task_run_config: TaskRunConfig | None = None
EvalJob( item: kiln_ai.datamodel.TaskRun, type: Literal['task_run_eval', 'eval_config_eval'], eval_config: kiln_ai.datamodel.eval.EvalConfig, task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None)
type: Literal['task_run_eval', 'eval_config_eval']
task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None
class EvalRunner:
 27class EvalRunner:
 28    """
 29    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 30
 31    Can run an eval in 2 modes:
 32    1) eval_config_eval: evaluate an eval config using existing dataset items.
 33    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 34    """
 35
 36    def __init__(
 37        self,
 38        eval_configs: List[EvalConfig],
 39        run_configs: List[TaskRunConfig] | None,
 40        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 41    ):
 42        if len(eval_configs) == 0:
 43            raise ValueError("Eval runner requires at least one eval config")
 44        target_eval = eval_configs[0].parent_eval()
 45        if target_eval is None:
 46            raise ValueError("Eval config requires a parent eval")
 47        for eval_config in eval_configs:
 48            parent_eval = eval_config.parent_eval()
 49            if parent_eval is None:
 50                raise ValueError("Eval config requires a parent eval")
 51            if parent_eval.id != target_eval.id:
 52                raise ValueError("All eval configs must have the same parent eval")
 53
 54        target_task = target_eval.parent_task()
 55        if target_task is None:
 56            raise ValueError("Eval config requires a (grand)parent task")
 57
 58        # Check that run_configs is compatible
 59        if eval_run_type == "task_run_eval":
 60            if run_configs is None or len(run_configs) == 0:
 61                raise ValueError("Task run eval requires run configs")
 62            for run_config in run_configs:
 63                parent_task = run_config.parent_task()
 64                if parent_task is None:
 65                    raise ValueError("All run configs must have a parent task")
 66                if parent_task.id != target_task.id:
 67                    raise ValueError(
 68                        "Run config is not for the same task as the eval configs"
 69                    )
 70        else:
 71            if run_configs is not None:
 72                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 73
 74        self.eval_run_type = eval_run_type
 75        self.eval_configs = eval_configs
 76        self.run_configs = run_configs
 77        self.task = target_task
 78        self.eval = target_eval
 79
 80    def collect_tasks(self) -> List[EvalJob]:
 81        if self.eval_run_type == "eval_config_eval":
 82            return self.collect_tasks_for_eval_config_eval()
 83        else:
 84            return self.collect_tasks_for_task_run_eval()
 85
 86    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 87        """
 88        Collect all jobs for this run, excluding any that have already been run.
 89
 90        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 91
 92        The tasks:
 93        - should be in the eval config set filter
 94        - should not have already been run for this eval config + dataset item pair
 95        """
 96        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
 97
 98        # already_run[eval_config_id][dataset_id]
 99        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
100        for eval_config in self.eval_configs:
101            already_run[eval_config.id] = set()
102            for run in eval_config.runs(readonly=True):
103                already_run[eval_config.id].add(run.dataset_id)
104
105        return [
106            EvalJob(
107                item=task_run,
108                eval_config=eval_config,
109                type="eval_config_eval",
110            )
111            for task_run in self.task.runs(readonly=True)
112            if filter(task_run)
113            for eval_config in self.eval_configs
114            if task_run.id not in already_run[eval_config.id]
115        ]
116
117    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
118        """
119        Collect all jobs for this run, excluding any that have already been run.
120
121        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
122
123        The tasks:
124        - should be in the eval set filter
125        - should not have already been run for this eval config + run config + dataset item
126        """
127        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
128
129        # already_run[eval_config_id][run_config_id][dataset_id]
130        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
131        for eval_config in self.eval_configs:
132            already_run[eval_config.id] = {}
133            for run_config in self.run_configs or []:
134                already_run[eval_config.id][run_config.id] = set()
135            for run in eval_config.runs(readonly=True):
136                if (
137                    run.task_run_config_id is not None
138                    and run.task_run_config_id in already_run[eval_config.id]
139                ):
140                    already_run[eval_config.id][run.task_run_config_id].add(
141                        run.dataset_id
142                    )
143
144        return [
145            EvalJob(
146                item=task_run,
147                task_run_config=run_config,
148                type="task_run_eval",
149                eval_config=eval_config,
150            )
151            for task_run in self.task.runs(readonly=True)
152            if filter(task_run)
153            for eval_config in self.eval_configs
154            for run_config in self.run_configs or []
155            if task_run.id not in already_run[eval_config.id][run_config.id]
156        ]
157
158    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
159        """
160        Runs the configured eval run with parallel workers and yields progress updates.
161        """
162        jobs = self.collect_tasks()
163
164        runner = AsyncJobRunner(
165            concurrency=concurrency,
166            jobs=jobs,
167            run_job_fn=self.run_job,
168        )
169        async for progress in runner.run():
170            yield progress
171
172    async def run_job(self, job: EvalJob) -> bool:
173        try:
174            # Create the evaluator for this eval config/run config pair
175            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
176                job.eval_config,
177                job.task_run_config.run_config_properties
178                if job.task_run_config
179                else None,
180            )
181            if not isinstance(evaluator, BaseEval):
182                raise ValueError("Not able to create evaluator from eval config")
183
184            task_output: str | None = None
185            scores: EvalScores | None = None
186            intermediate_outputs: Dict[str, str] | None = None
187            task_run_usage: Usage | None = None
188            if job.type == "eval_config_eval":
189                # Eval config eval, we use the saved input from the task run, not invoking the task again
190                scores, intermediate_outputs = await evaluator.run_eval(job.item)
191                task_output = job.item.output.output
192                task_run_usage = job.item.usage
193            else:
194                # Task run eval, we invoke the task again to get a fresh output
195                (
196                    result_task_run,
197                    scores,
198                    intermediate_outputs,
199                ) = await evaluator.run_task_and_eval(job.item.input)
200                task_output = result_task_run.output.output
201                task_run_usage = result_task_run.usage
202
203            # Save the job result
204            eval_run = EvalRun(
205                parent=job.eval_config,
206                task_run_config_id=job.task_run_config.id
207                if job.task_run_config
208                else None,
209                dataset_id=job.item.id,
210                eval_config_eval=job.type == "eval_config_eval",
211                scores=scores,
212                input=job.item.input,
213                output=task_output,
214                intermediate_outputs=intermediate_outputs,
215                task_run_usage=task_run_usage,
216            )
217            eval_run.save_to_file()
218
219            return True
220        except Exception as e:
221            logger.error(
222                f"Error running eval job for dataset item {job.item.id}: {e}",
223                exc_info=True,
224            )
225            return False

Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.

Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.

EvalRunner( eval_configs: List[kiln_ai.datamodel.eval.EvalConfig], run_configs: Optional[List[kiln_ai.datamodel.task.TaskRunConfig]], eval_run_type: Literal['eval_config_eval', 'task_run_eval'])
36    def __init__(
37        self,
38        eval_configs: List[EvalConfig],
39        run_configs: List[TaskRunConfig] | None,
40        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
41    ):
42        if len(eval_configs) == 0:
43            raise ValueError("Eval runner requires at least one eval config")
44        target_eval = eval_configs[0].parent_eval()
45        if target_eval is None:
46            raise ValueError("Eval config requires a parent eval")
47        for eval_config in eval_configs:
48            parent_eval = eval_config.parent_eval()
49            if parent_eval is None:
50                raise ValueError("Eval config requires a parent eval")
51            if parent_eval.id != target_eval.id:
52                raise ValueError("All eval configs must have the same parent eval")
53
54        target_task = target_eval.parent_task()
55        if target_task is None:
56            raise ValueError("Eval config requires a (grand)parent task")
57
58        # Check that run_configs is compatible
59        if eval_run_type == "task_run_eval":
60            if run_configs is None or len(run_configs) == 0:
61                raise ValueError("Task run eval requires run configs")
62            for run_config in run_configs:
63                parent_task = run_config.parent_task()
64                if parent_task is None:
65                    raise ValueError("All run configs must have a parent task")
66                if parent_task.id != target_task.id:
67                    raise ValueError(
68                        "Run config is not for the same task as the eval configs"
69                    )
70        else:
71            if run_configs is not None:
72                raise ValueError("Mode 'eval_config_eval' does not support run configs")
73
74        self.eval_run_type = eval_run_type
75        self.eval_configs = eval_configs
76        self.run_configs = run_configs
77        self.task = target_task
78        self.eval = target_eval
eval_run_type
eval_configs
run_configs
task
eval
def collect_tasks(self) -> List[EvalJob]:
80    def collect_tasks(self) -> List[EvalJob]:
81        if self.eval_run_type == "eval_config_eval":
82            return self.collect_tasks_for_eval_config_eval()
83        else:
84            return self.collect_tasks_for_task_run_eval()
def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 86    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 87        """
 88        Collect all jobs for this run, excluding any that have already been run.
 89
 90        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 91
 92        The tasks:
 93        - should be in the eval config set filter
 94        - should not have already been run for this eval config + dataset item pair
 95        """
 96        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
 97
 98        # already_run[eval_config_id][dataset_id]
 99        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
100        for eval_config in self.eval_configs:
101            already_run[eval_config.id] = set()
102            for run in eval_config.runs(readonly=True):
103                already_run[eval_config.id].add(run.dataset_id)
104
105        return [
106            EvalJob(
107                item=task_run,
108                eval_config=eval_config,
109                type="eval_config_eval",
110            )
111            for task_run in self.task.runs(readonly=True)
112            if filter(task_run)
113            for eval_config in self.eval_configs
114            if task_run.id not in already_run[eval_config.id]
115        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).

The tasks:

  • should be in the eval config set filter
  • should not have already been run for this eval config + dataset item pair
def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
117    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
118        """
119        Collect all jobs for this run, excluding any that have already been run.
120
121        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
122
123        The tasks:
124        - should be in the eval set filter
125        - should not have already been run for this eval config + run config + dataset item
126        """
127        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
128
129        # already_run[eval_config_id][run_config_id][dataset_id]
130        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
131        for eval_config in self.eval_configs:
132            already_run[eval_config.id] = {}
133            for run_config in self.run_configs or []:
134                already_run[eval_config.id][run_config.id] = set()
135            for run in eval_config.runs(readonly=True):
136                if (
137                    run.task_run_config_id is not None
138                    and run.task_run_config_id in already_run[eval_config.id]
139                ):
140                    already_run[eval_config.id][run.task_run_config_id].add(
141                        run.dataset_id
142                    )
143
144        return [
145            EvalJob(
146                item=task_run,
147                task_run_config=run_config,
148                type="task_run_eval",
149                eval_config=eval_config,
150            )
151            for task_run in self.task.runs(readonly=True)
152            if filter(task_run)
153            for eval_config in self.eval_configs
154            for run_config in self.run_configs or []
155            if task_run.id not in already_run[eval_config.id][run_config.id]
156        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.

The tasks:

  • should be in the eval set filter
  • should not have already been run for this eval config + run config + dataset item
async def run( self, concurrency: int = 25) -> AsyncGenerator[kiln_ai.utils.async_job_runner.Progress, NoneType]:
158    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
159        """
160        Runs the configured eval run with parallel workers and yields progress updates.
161        """
162        jobs = self.collect_tasks()
163
164        runner = AsyncJobRunner(
165            concurrency=concurrency,
166            jobs=jobs,
167            run_job_fn=self.run_job,
168        )
169        async for progress in runner.run():
170            yield progress

Runs the configured eval run with parallel workers and yields progress updates.

async def run_job(self, job: EvalJob) -> bool:
172    async def run_job(self, job: EvalJob) -> bool:
173        try:
174            # Create the evaluator for this eval config/run config pair
175            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
176                job.eval_config,
177                job.task_run_config.run_config_properties
178                if job.task_run_config
179                else None,
180            )
181            if not isinstance(evaluator, BaseEval):
182                raise ValueError("Not able to create evaluator from eval config")
183
184            task_output: str | None = None
185            scores: EvalScores | None = None
186            intermediate_outputs: Dict[str, str] | None = None
187            task_run_usage: Usage | None = None
188            if job.type == "eval_config_eval":
189                # Eval config eval, we use the saved input from the task run, not invoking the task again
190                scores, intermediate_outputs = await evaluator.run_eval(job.item)
191                task_output = job.item.output.output
192                task_run_usage = job.item.usage
193            else:
194                # Task run eval, we invoke the task again to get a fresh output
195                (
196                    result_task_run,
197                    scores,
198                    intermediate_outputs,
199                ) = await evaluator.run_task_and_eval(job.item.input)
200                task_output = result_task_run.output.output
201                task_run_usage = result_task_run.usage
202
203            # Save the job result
204            eval_run = EvalRun(
205                parent=job.eval_config,
206                task_run_config_id=job.task_run_config.id
207                if job.task_run_config
208                else None,
209                dataset_id=job.item.id,
210                eval_config_eval=job.type == "eval_config_eval",
211                scores=scores,
212                input=job.item.input,
213                output=task_output,
214                intermediate_outputs=intermediate_outputs,
215                task_run_usage=task_run_usage,
216            )
217            eval_run.save_to_file()
218
219            return True
220        except Exception as e:
221            logger.error(
222                f"Error running eval job for dataset item {job.item.id}: {e}",
223                exc_info=True,
224            )
225            return False