kiln_ai.adapters.eval.eval_runner

  1import logging
  2from dataclasses import dataclass
  3from typing import AsyncGenerator, Dict, List, Literal, Set
  4
  5from kiln_ai.adapters.eval.base_eval import BaseEval
  6from kiln_ai.adapters.eval.registry import eval_adapter_from_type
  7from kiln_ai.datamodel.basemodel import ID_TYPE
  8from kiln_ai.datamodel.dataset_filters import dataset_filter_from_id
  9from kiln_ai.datamodel.eval import EvalConfig, EvalRun, EvalScores
 10from kiln_ai.datamodel.task import TaskRunConfig
 11from kiln_ai.datamodel.task_run import TaskRun
 12from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress
 13
 14logger = logging.getLogger(__name__)
 15
 16
 17@dataclass
 18class EvalJob:
 19    item: TaskRun
 20    type: Literal["task_run_eval", "eval_config_eval"]
 21    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
 22    eval_config: EvalConfig
 23    task_run_config: TaskRunConfig | None = None
 24
 25
 26class EvalRunner:
 27    """
 28    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 29
 30    Can run an eval in 2 modes:
 31    1) eval_config_eval: evaluate an eval config using existing dataset items.
 32    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 33    """
 34
 35    def __init__(
 36        self,
 37        eval_configs: List[EvalConfig],
 38        run_configs: List[TaskRunConfig] | None,
 39        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 40    ):
 41        if len(eval_configs) == 0:
 42            raise ValueError("Eval runner requires at least one eval config")
 43        target_eval = eval_configs[0].parent_eval()
 44        if target_eval is None:
 45            raise ValueError("Eval config requires a parent eval")
 46        for eval_config in eval_configs:
 47            parent_eval = eval_config.parent_eval()
 48            if parent_eval is None:
 49                raise ValueError("Eval config requires a parent eval")
 50            if parent_eval.id != target_eval.id:
 51                raise ValueError("All eval configs must have the same parent eval")
 52
 53        target_task = target_eval.parent_task()
 54        if target_task is None:
 55            raise ValueError("Eval config requires a (grand)parent task")
 56
 57        # Check that run_configs is compatible
 58        if eval_run_type == "task_run_eval":
 59            if run_configs is None or len(run_configs) == 0:
 60                raise ValueError("Task run eval requires run configs")
 61            for run_config in run_configs:
 62                parent_task = run_config.parent_task()
 63                if parent_task is None:
 64                    raise ValueError("All run configs must have a parent task")
 65                if parent_task.id != target_task.id:
 66                    raise ValueError(
 67                        "Run config is not for the same task as the eval configs"
 68                    )
 69        else:
 70            if run_configs is not None:
 71                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 72
 73        self.eval_run_type = eval_run_type
 74        self.eval_configs = eval_configs
 75        self.run_configs = run_configs
 76        self.task = target_task
 77        self.eval = target_eval
 78
 79    def collect_tasks(self) -> List[EvalJob]:
 80        if self.eval_run_type == "eval_config_eval":
 81            return self.collect_tasks_for_eval_config_eval()
 82        else:
 83            return self.collect_tasks_for_task_run_eval()
 84
 85    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 86        """
 87        Collect all jobs for this run, excluding any that have already been run.
 88
 89        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 90
 91        The tasks:
 92        - should be in the eval config set filter
 93        - should not have already been run for this eval config + dataset item pair
 94        """
 95        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
 96
 97        # already_run[eval_config_id][dataset_id]
 98        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
 99        for eval_config in self.eval_configs:
100            already_run[eval_config.id] = set()
101            for run in eval_config.runs(readonly=True):
102                already_run[eval_config.id].add(run.dataset_id)
103
104        return [
105            EvalJob(
106                item=task_run,
107                eval_config=eval_config,
108                type="eval_config_eval",
109            )
110            for task_run in self.task.runs(readonly=True)
111            if filter(task_run)
112            for eval_config in self.eval_configs
113            if task_run.id not in already_run[eval_config.id]
114        ]
115
116    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
117        """
118        Collect all jobs for this run, excluding any that have already been run.
119
120        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
121
122        The tasks:
123        - should be in the eval set filter
124        - should not have already been run for this eval config + run config + dataset item
125        """
126        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
127
128        # already_run[eval_config_id][run_config_id][dataset_id]
129        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
130        for eval_config in self.eval_configs:
131            already_run[eval_config.id] = {}
132            for run_config in self.run_configs or []:
133                already_run[eval_config.id][run_config.id] = set()
134            for run in eval_config.runs(readonly=True):
135                if (
136                    run.task_run_config_id is not None
137                    and run.task_run_config_id in already_run[eval_config.id]
138                ):
139                    already_run[eval_config.id][run.task_run_config_id].add(
140                        run.dataset_id
141                    )
142
143        return [
144            EvalJob(
145                item=task_run,
146                task_run_config=run_config,
147                type="task_run_eval",
148                eval_config=eval_config,
149            )
150            for task_run in self.task.runs(readonly=True)
151            if filter(task_run)
152            for eval_config in self.eval_configs
153            for run_config in self.run_configs or []
154            if task_run.id not in already_run[eval_config.id][run_config.id]
155        ]
156
157    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
158        """
159        Runs the configured eval run with parallel workers and yields progress updates.
160        """
161        jobs = self.collect_tasks()
162
163        runner = AsyncJobRunner(concurrency=concurrency)
164        async for progress in runner.run(jobs, self.run_job):
165            yield progress
166
167    async def run_job(self, job: EvalJob) -> bool:
168        try:
169            # Create the evaluator for this eval config/run config pair
170            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
171                job.eval_config,
172                job.task_run_config.run_config() if job.task_run_config else None,
173            )
174            if not isinstance(evaluator, BaseEval):
175                raise ValueError("Not able to create evaluator from eval config")
176
177            task_output: str | None = None
178            scores: EvalScores | None = None
179            intermediate_outputs: Dict[str, str] | None = None
180            if job.type == "eval_config_eval":
181                # Eval config eval, we use the saved input from the task run, not invoking the task again
182                scores, intermediate_outputs = await evaluator.run_eval(job.item)
183                task_output = job.item.output.output
184            else:
185                # Task run eval, we invoke the task again to get a fresh output
186                (
187                    result_task_run,
188                    scores,
189                    intermediate_outputs,
190                ) = await evaluator.run_task_and_eval(job.item.input)
191                task_output = result_task_run.output.output
192
193            # Save the job result
194            eval_run = EvalRun(
195                parent=job.eval_config,
196                task_run_config_id=job.task_run_config.id
197                if job.task_run_config
198                else None,
199                dataset_id=job.item.id,
200                eval_config_eval=job.type == "eval_config_eval",
201                scores=scores,
202                input=job.item.input,
203                output=task_output,
204                intermediate_outputs=intermediate_outputs,
205            )
206            eval_run.save_to_file()
207
208            return True
209        except Exception as e:
210            logger.error(f"Error running eval job for dataset item {job.item.id}: {e}")
211            return False
logger = <Logger kiln_ai.adapters.eval.eval_runner (WARNING)>
@dataclass
class EvalJob:
18@dataclass
19class EvalJob:
20    item: TaskRun
21    type: Literal["task_run_eval", "eval_config_eval"]
22    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
23    eval_config: EvalConfig
24    task_run_config: TaskRunConfig | None = None
EvalJob( item: kiln_ai.datamodel.TaskRun, type: Literal['task_run_eval', 'eval_config_eval'], eval_config: kiln_ai.datamodel.eval.EvalConfig, task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None)
type: Literal['task_run_eval', 'eval_config_eval']
task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None
class EvalRunner:
 27class EvalRunner:
 28    """
 29    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 30
 31    Can run an eval in 2 modes:
 32    1) eval_config_eval: evaluate an eval config using existing dataset items.
 33    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 34    """
 35
 36    def __init__(
 37        self,
 38        eval_configs: List[EvalConfig],
 39        run_configs: List[TaskRunConfig] | None,
 40        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 41    ):
 42        if len(eval_configs) == 0:
 43            raise ValueError("Eval runner requires at least one eval config")
 44        target_eval = eval_configs[0].parent_eval()
 45        if target_eval is None:
 46            raise ValueError("Eval config requires a parent eval")
 47        for eval_config in eval_configs:
 48            parent_eval = eval_config.parent_eval()
 49            if parent_eval is None:
 50                raise ValueError("Eval config requires a parent eval")
 51            if parent_eval.id != target_eval.id:
 52                raise ValueError("All eval configs must have the same parent eval")
 53
 54        target_task = target_eval.parent_task()
 55        if target_task is None:
 56            raise ValueError("Eval config requires a (grand)parent task")
 57
 58        # Check that run_configs is compatible
 59        if eval_run_type == "task_run_eval":
 60            if run_configs is None or len(run_configs) == 0:
 61                raise ValueError("Task run eval requires run configs")
 62            for run_config in run_configs:
 63                parent_task = run_config.parent_task()
 64                if parent_task is None:
 65                    raise ValueError("All run configs must have a parent task")
 66                if parent_task.id != target_task.id:
 67                    raise ValueError(
 68                        "Run config is not for the same task as the eval configs"
 69                    )
 70        else:
 71            if run_configs is not None:
 72                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 73
 74        self.eval_run_type = eval_run_type
 75        self.eval_configs = eval_configs
 76        self.run_configs = run_configs
 77        self.task = target_task
 78        self.eval = target_eval
 79
 80    def collect_tasks(self) -> List[EvalJob]:
 81        if self.eval_run_type == "eval_config_eval":
 82            return self.collect_tasks_for_eval_config_eval()
 83        else:
 84            return self.collect_tasks_for_task_run_eval()
 85
 86    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 87        """
 88        Collect all jobs for this run, excluding any that have already been run.
 89
 90        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 91
 92        The tasks:
 93        - should be in the eval config set filter
 94        - should not have already been run for this eval config + dataset item pair
 95        """
 96        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
 97
 98        # already_run[eval_config_id][dataset_id]
 99        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
100        for eval_config in self.eval_configs:
101            already_run[eval_config.id] = set()
102            for run in eval_config.runs(readonly=True):
103                already_run[eval_config.id].add(run.dataset_id)
104
105        return [
106            EvalJob(
107                item=task_run,
108                eval_config=eval_config,
109                type="eval_config_eval",
110            )
111            for task_run in self.task.runs(readonly=True)
112            if filter(task_run)
113            for eval_config in self.eval_configs
114            if task_run.id not in already_run[eval_config.id]
115        ]
116
117    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
118        """
119        Collect all jobs for this run, excluding any that have already been run.
120
121        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
122
123        The tasks:
124        - should be in the eval set filter
125        - should not have already been run for this eval config + run config + dataset item
126        """
127        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
128
129        # already_run[eval_config_id][run_config_id][dataset_id]
130        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
131        for eval_config in self.eval_configs:
132            already_run[eval_config.id] = {}
133            for run_config in self.run_configs or []:
134                already_run[eval_config.id][run_config.id] = set()
135            for run in eval_config.runs(readonly=True):
136                if (
137                    run.task_run_config_id is not None
138                    and run.task_run_config_id in already_run[eval_config.id]
139                ):
140                    already_run[eval_config.id][run.task_run_config_id].add(
141                        run.dataset_id
142                    )
143
144        return [
145            EvalJob(
146                item=task_run,
147                task_run_config=run_config,
148                type="task_run_eval",
149                eval_config=eval_config,
150            )
151            for task_run in self.task.runs(readonly=True)
152            if filter(task_run)
153            for eval_config in self.eval_configs
154            for run_config in self.run_configs or []
155            if task_run.id not in already_run[eval_config.id][run_config.id]
156        ]
157
158    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
159        """
160        Runs the configured eval run with parallel workers and yields progress updates.
161        """
162        jobs = self.collect_tasks()
163
164        runner = AsyncJobRunner(concurrency=concurrency)
165        async for progress in runner.run(jobs, self.run_job):
166            yield progress
167
168    async def run_job(self, job: EvalJob) -> bool:
169        try:
170            # Create the evaluator for this eval config/run config pair
171            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
172                job.eval_config,
173                job.task_run_config.run_config() if job.task_run_config else None,
174            )
175            if not isinstance(evaluator, BaseEval):
176                raise ValueError("Not able to create evaluator from eval config")
177
178            task_output: str | None = None
179            scores: EvalScores | None = None
180            intermediate_outputs: Dict[str, str] | None = None
181            if job.type == "eval_config_eval":
182                # Eval config eval, we use the saved input from the task run, not invoking the task again
183                scores, intermediate_outputs = await evaluator.run_eval(job.item)
184                task_output = job.item.output.output
185            else:
186                # Task run eval, we invoke the task again to get a fresh output
187                (
188                    result_task_run,
189                    scores,
190                    intermediate_outputs,
191                ) = await evaluator.run_task_and_eval(job.item.input)
192                task_output = result_task_run.output.output
193
194            # Save the job result
195            eval_run = EvalRun(
196                parent=job.eval_config,
197                task_run_config_id=job.task_run_config.id
198                if job.task_run_config
199                else None,
200                dataset_id=job.item.id,
201                eval_config_eval=job.type == "eval_config_eval",
202                scores=scores,
203                input=job.item.input,
204                output=task_output,
205                intermediate_outputs=intermediate_outputs,
206            )
207            eval_run.save_to_file()
208
209            return True
210        except Exception as e:
211            logger.error(f"Error running eval job for dataset item {job.item.id}: {e}")
212            return False

Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.

Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.

EvalRunner( eval_configs: List[kiln_ai.datamodel.eval.EvalConfig], run_configs: Optional[List[kiln_ai.datamodel.task.TaskRunConfig]], eval_run_type: Literal['eval_config_eval', 'task_run_eval'])
36    def __init__(
37        self,
38        eval_configs: List[EvalConfig],
39        run_configs: List[TaskRunConfig] | None,
40        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
41    ):
42        if len(eval_configs) == 0:
43            raise ValueError("Eval runner requires at least one eval config")
44        target_eval = eval_configs[0].parent_eval()
45        if target_eval is None:
46            raise ValueError("Eval config requires a parent eval")
47        for eval_config in eval_configs:
48            parent_eval = eval_config.parent_eval()
49            if parent_eval is None:
50                raise ValueError("Eval config requires a parent eval")
51            if parent_eval.id != target_eval.id:
52                raise ValueError("All eval configs must have the same parent eval")
53
54        target_task = target_eval.parent_task()
55        if target_task is None:
56            raise ValueError("Eval config requires a (grand)parent task")
57
58        # Check that run_configs is compatible
59        if eval_run_type == "task_run_eval":
60            if run_configs is None or len(run_configs) == 0:
61                raise ValueError("Task run eval requires run configs")
62            for run_config in run_configs:
63                parent_task = run_config.parent_task()
64                if parent_task is None:
65                    raise ValueError("All run configs must have a parent task")
66                if parent_task.id != target_task.id:
67                    raise ValueError(
68                        "Run config is not for the same task as the eval configs"
69                    )
70        else:
71            if run_configs is not None:
72                raise ValueError("Mode 'eval_config_eval' does not support run configs")
73
74        self.eval_run_type = eval_run_type
75        self.eval_configs = eval_configs
76        self.run_configs = run_configs
77        self.task = target_task
78        self.eval = target_eval
eval_run_type
eval_configs
run_configs
task
eval
def collect_tasks(self) -> List[EvalJob]:
80    def collect_tasks(self) -> List[EvalJob]:
81        if self.eval_run_type == "eval_config_eval":
82            return self.collect_tasks_for_eval_config_eval()
83        else:
84            return self.collect_tasks_for_task_run_eval()
def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 86    def collect_tasks_for_eval_config_eval(self) -> List[EvalJob]:
 87        """
 88        Collect all jobs for this run, excluding any that have already been run.
 89
 90        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
 91
 92        The tasks:
 93        - should be in the eval config set filter
 94        - should not have already been run for this eval config + dataset item pair
 95        """
 96        filter = dataset_filter_from_id(self.eval.eval_configs_filter_id)
 97
 98        # already_run[eval_config_id][dataset_id]
 99        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
100        for eval_config in self.eval_configs:
101            already_run[eval_config.id] = set()
102            for run in eval_config.runs(readonly=True):
103                already_run[eval_config.id].add(run.dataset_id)
104
105        return [
106            EvalJob(
107                item=task_run,
108                eval_config=eval_config,
109                type="eval_config_eval",
110            )
111            for task_run in self.task.runs(readonly=True)
112            if filter(task_run)
113            for eval_config in self.eval_configs
114            if task_run.id not in already_run[eval_config.id]
115        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).

The tasks:

  • should be in the eval config set filter
  • should not have already been run for this eval config + dataset item pair
def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
117    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
118        """
119        Collect all jobs for this run, excluding any that have already been run.
120
121        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
122
123        The tasks:
124        - should be in the eval set filter
125        - should not have already been run for this eval config + run config + dataset item
126        """
127        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
128
129        # already_run[eval_config_id][run_config_id][dataset_id]
130        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
131        for eval_config in self.eval_configs:
132            already_run[eval_config.id] = {}
133            for run_config in self.run_configs or []:
134                already_run[eval_config.id][run_config.id] = set()
135            for run in eval_config.runs(readonly=True):
136                if (
137                    run.task_run_config_id is not None
138                    and run.task_run_config_id in already_run[eval_config.id]
139                ):
140                    already_run[eval_config.id][run.task_run_config_id].add(
141                        run.dataset_id
142                    )
143
144        return [
145            EvalJob(
146                item=task_run,
147                task_run_config=run_config,
148                type="task_run_eval",
149                eval_config=eval_config,
150            )
151            for task_run in self.task.runs(readonly=True)
152            if filter(task_run)
153            for eval_config in self.eval_configs
154            for run_config in self.run_configs or []
155            if task_run.id not in already_run[eval_config.id][run_config.id]
156        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.

The tasks:

  • should be in the eval set filter
  • should not have already been run for this eval config + run config + dataset item
async def run( self, concurrency: int = 25) -> AsyncGenerator[kiln_ai.utils.async_job_runner.Progress, NoneType]:
158    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
159        """
160        Runs the configured eval run with parallel workers and yields progress updates.
161        """
162        jobs = self.collect_tasks()
163
164        runner = AsyncJobRunner(concurrency=concurrency)
165        async for progress in runner.run(jobs, self.run_job):
166            yield progress

Runs the configured eval run with parallel workers and yields progress updates.

async def run_job(self, job: EvalJob) -> bool:
168    async def run_job(self, job: EvalJob) -> bool:
169        try:
170            # Create the evaluator for this eval config/run config pair
171            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
172                job.eval_config,
173                job.task_run_config.run_config() if job.task_run_config else None,
174            )
175            if not isinstance(evaluator, BaseEval):
176                raise ValueError("Not able to create evaluator from eval config")
177
178            task_output: str | None = None
179            scores: EvalScores | None = None
180            intermediate_outputs: Dict[str, str] | None = None
181            if job.type == "eval_config_eval":
182                # Eval config eval, we use the saved input from the task run, not invoking the task again
183                scores, intermediate_outputs = await evaluator.run_eval(job.item)
184                task_output = job.item.output.output
185            else:
186                # Task run eval, we invoke the task again to get a fresh output
187                (
188                    result_task_run,
189                    scores,
190                    intermediate_outputs,
191                ) = await evaluator.run_task_and_eval(job.item.input)
192                task_output = result_task_run.output.output
193
194            # Save the job result
195            eval_run = EvalRun(
196                parent=job.eval_config,
197                task_run_config_id=job.task_run_config.id
198                if job.task_run_config
199                else None,
200                dataset_id=job.item.id,
201                eval_config_eval=job.type == "eval_config_eval",
202                scores=scores,
203                input=job.item.input,
204                output=task_output,
205                intermediate_outputs=intermediate_outputs,
206            )
207            eval_run.save_to_file()
208
209            return True
210        except Exception as e:
211            logger.error(f"Error running eval job for dataset item {job.item.id}: {e}")
212            return False