kiln_ai.adapters.eval.eval_runner

  1import json
  2import logging
  3from dataclasses import dataclass
  4from typing import AsyncGenerator, Dict, List, Literal, Set
  5
  6import litellm
  7
  8from kiln_ai.adapters.adapter_registry import load_skills_for_task
  9from kiln_ai.adapters.eval.base_eval import BaseEval
 10from kiln_ai.adapters.eval.registry import eval_adapter_from_type
 11from kiln_ai.adapters.model_adapters.base_adapter import SkillsDict
 12from kiln_ai.datamodel.basemodel import ID_TYPE
 13from kiln_ai.datamodel.dataset_filters import DatasetFilterId, dataset_filter_from_id
 14from kiln_ai.datamodel.eval import EvalConfig, EvalDataType, EvalRun, EvalScores
 15from kiln_ai.datamodel.task import TaskRunConfig
 16from kiln_ai.datamodel.task_run import TaskRun, Usage
 17from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress, RetryableError
 18from kiln_ai.utils.git_sync_protocols import SaveContext, default_save_context
 19
 20logger = logging.getLogger(__name__)
 21
 22
 23@dataclass
 24class EvalJob:
 25    item: TaskRun
 26    type: Literal["task_run_eval", "eval_config_eval"]
 27    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
 28    eval_config: EvalConfig
 29    task_run_config: TaskRunConfig | None = None
 30
 31
 32class EvalRunner:
 33    """
 34    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 35
 36    Can run an eval in 2 modes:
 37    1) eval_config_eval: evaluate an eval config using existing dataset items.
 38    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 39    """
 40
 41    def __init__(
 42        self,
 43        eval_configs: List[EvalConfig],
 44        run_configs: List[TaskRunConfig] | None,
 45        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 46        save_context: SaveContext | None = None,
 47    ):
 48        if len(eval_configs) == 0:
 49            raise ValueError("Eval runner requires at least one eval config")
 50        target_eval = eval_configs[0].parent_eval()
 51        if target_eval is None:
 52            raise ValueError("Eval config requires a parent eval")
 53        for eval_config in eval_configs:
 54            parent_eval = eval_config.parent_eval()
 55            if parent_eval is None:
 56                raise ValueError("Eval config requires a parent eval")
 57            if parent_eval.id != target_eval.id:
 58                raise ValueError("All eval configs must have the same parent eval")
 59
 60        target_task = target_eval.parent_task()
 61        if target_task is None:
 62            raise ValueError("Eval config requires a (grand)parent task")
 63
 64        # Check that run_configs is compatible
 65        if eval_run_type == "task_run_eval":
 66            if run_configs is None or len(run_configs) == 0:
 67                raise ValueError("Task run eval requires run configs")
 68            for run_config in run_configs:
 69                parent_task = run_config.parent_task()
 70                if parent_task is None:
 71                    raise ValueError("All run configs must have a parent task")
 72                if parent_task.id != target_task.id:
 73                    raise ValueError(
 74                        "Run config is not for the same task as the eval configs"
 75                    )
 76        else:
 77            if run_configs is not None:
 78                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 79
 80        self.eval_run_type = eval_run_type
 81        self.eval_configs = eval_configs
 82        self.run_configs = run_configs
 83        self.task = target_task
 84        self.eval = target_eval
 85        self._skills: SkillsDict = self._preload_skills()
 86        self._save_context: SaveContext = save_context or default_save_context
 87
 88    def collect_tasks(self) -> List[EvalJob]:
 89        if self.eval_run_type == "eval_config_eval":
 90            if self.eval.eval_configs_filter_id is not None:
 91                return self.collect_tasks_for_eval_config_eval(
 92                    self.eval.eval_configs_filter_id
 93                )
 94            else:
 95                raise ValueError(
 96                    "Eval configs filter ID is required for eval runs of type 'eval_config_eval'"
 97                )
 98
 99        else:
100            return self.collect_tasks_for_task_run_eval()
101
102    def collect_tasks_for_eval_config_eval(
103        self, eval_configs_filter_id: DatasetFilterId
104    ) -> List[EvalJob]:
105        """
106        Collect all jobs for this run, excluding any that have already been run.
107
108        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
109
110        The tasks:
111        - should be in the eval config set filter
112        - should not have already been run for this eval config + dataset item pair
113        """
114        filter = dataset_filter_from_id(eval_configs_filter_id)
115
116        # already_run[eval_config_id][dataset_id]
117        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
118        for eval_config in self.eval_configs:
119            already_run[eval_config.id] = set()
120            for run in eval_config.runs(readonly=True):
121                already_run[eval_config.id].add(run.dataset_id)
122
123        return [
124            EvalJob(
125                item=task_run,
126                eval_config=eval_config,
127                type="eval_config_eval",
128            )
129            for task_run in self.task.runs(readonly=True)
130            if filter(task_run)
131            for eval_config in self.eval_configs
132            if task_run.id not in already_run[eval_config.id]
133        ]
134
135    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
136        """
137        Collect all jobs for this run, excluding any that have already been run.
138
139        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
140
141        The tasks:
142        - should be in the eval set filter
143        - should not have already been run for this eval config + run config + dataset item
144        """
145        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
146
147        # already_run[eval_config_id][run_config_id][dataset_id]
148        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
149        for eval_config in self.eval_configs:
150            already_run[eval_config.id] = {}
151            for run_config in self.run_configs or []:
152                already_run[eval_config.id][run_config.id] = set()
153            for run in eval_config.runs(readonly=True):
154                if (
155                    run.task_run_config_id is not None
156                    and run.task_run_config_id in already_run[eval_config.id]
157                ):
158                    already_run[eval_config.id][run.task_run_config_id].add(
159                        run.dataset_id
160                    )
161
162        return [
163            EvalJob(
164                item=task_run,
165                task_run_config=run_config,
166                type="task_run_eval",
167                eval_config=eval_config,
168            )
169            for task_run in self.task.runs(readonly=True)
170            if filter(task_run)
171            for eval_config in self.eval_configs
172            for run_config in self.run_configs or []
173            if task_run.id not in already_run[eval_config.id][run_config.id]
174        ]
175
176    def _preload_skills(self) -> SkillsDict:
177        """Collect all skill IDs from run configs and bulk-load them once."""
178        if self.run_configs is None:
179            return {}
180        merged: SkillsDict = {}
181        for rc in self.run_configs:
182            skills = load_skills_for_task(self.task, rc.run_config_properties)
183            merged.update(skills)
184        return merged
185
186    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
187        """
188        Runs the configured eval run with parallel workers and yields progress updates.
189        """
190        jobs = self.collect_tasks()
191
192        runner = AsyncJobRunner(
193            concurrency=concurrency,
194            jobs=jobs,
195            run_job_fn=self.run_job,
196            max_retries=2,
197        )
198        async for progress in runner.run():
199            yield progress
200
201    async def run_job(self, job: EvalJob) -> bool:
202        try:
203            # Create the evaluator for this eval config/run config pair
204            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
205                job.eval_config,
206                job.task_run_config.run_config_properties
207                if job.task_run_config
208                else None,
209                skills=self._skills,
210            )
211            if not isinstance(evaluator, BaseEval):
212                raise ValueError("Not able to create evaluator from eval config")
213
214            task_output: str | None = None
215            reference_answer: str | None = None
216            trace: str | None = None
217            scores: EvalScores | None = None
218            intermediate_outputs: Dict[str, str] | None = None
219            task_run_usage: Usage | None = None
220            if job.type == "eval_config_eval":
221                # Eval config eval, we use the saved input from the task run, not invoking the task again
222                scores, intermediate_outputs = await evaluator.run_eval(job.item)
223                task_output = job.item.output.output
224                task_run_usage = job.item.usage
225            else:
226                # Task run eval, we invoke the task again to get a fresh output
227                (
228                    result_task_run,
229                    scores,
230                    intermediate_outputs,
231                ) = await evaluator.run_task_and_eval(job.item)
232                task_output = result_task_run.output.output
233                task_run_usage = result_task_run.usage
234
235                parent_eval = job.eval_config.parent_eval()
236                if (
237                    parent_eval
238                    and parent_eval.evaluation_data_type == EvalDataType.full_trace
239                    and result_task_run.trace
240                ):
241                    trace = json.dumps(result_task_run.trace, indent=2)
242
243                if (
244                    parent_eval
245                    and parent_eval.evaluation_data_type
246                    == EvalDataType.reference_answer
247                ):
248                    reference_answer = job.item.output.output
249
250            # Save the job result
251            async with self._save_context():
252                eval_run = EvalRun(
253                    parent=job.eval_config,
254                    task_run_config_id=job.task_run_config.id
255                    if job.task_run_config
256                    else None,
257                    dataset_id=job.item.id,
258                    eval_config_eval=job.type == "eval_config_eval",
259                    scores=scores,
260                    input=job.item.input,
261                    output=task_output,
262                    reference_answer=reference_answer,
263                    intermediate_outputs=intermediate_outputs,
264                    task_run_trace=trace,
265                    task_run_usage=task_run_usage,
266                )
267                eval_run.save_to_file()
268
269            return True
270        except Exception as e:
271            if _is_retryable_error(e):
272                logger.error(
273                    f"Transient error running eval job for dataset item {job.item.id}: {e}",
274                    exc_info=True,
275                )
276                raise RetryableError(str(e)) from e
277            logger.error(
278                f"Error running eval job for dataset item {job.item.id}: {e}",
279                exc_info=True,
280            )
281            raise
282
283
284def _is_retryable_error(e: BaseException) -> bool:
285    if isinstance(
286        e,
287        (
288            litellm.RateLimitError,
289            litellm.APIConnectionError,
290            litellm.InternalServerError,
291            litellm.ServiceUnavailableError,
292            litellm.BadGatewayError,
293            litellm.JSONSchemaValidationError,
294        ),
295    ):
296        return True
297
298    # ValueError thrown by Kiln's adapter when structured output doesn't match schema
299    if isinstance(
300        e, ValueError
301    ) and "This task requires a specific output schema" in str(e):
302        return True
303
304    return False
logger = <Logger kiln_ai.adapters.eval.eval_runner (WARNING)>
@dataclass
class EvalJob:
24@dataclass
25class EvalJob:
26    item: TaskRun
27    type: Literal["task_run_eval", "eval_config_eval"]
28    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
29    eval_config: EvalConfig
30    task_run_config: TaskRunConfig | None = None
EvalJob( item: kiln_ai.datamodel.TaskRun, type: Literal['task_run_eval', 'eval_config_eval'], eval_config: kiln_ai.datamodel.eval.EvalConfig, task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None)
type: Literal['task_run_eval', 'eval_config_eval']
task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None
class EvalRunner:
 33class EvalRunner:
 34    """
 35    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 36
 37    Can run an eval in 2 modes:
 38    1) eval_config_eval: evaluate an eval config using existing dataset items.
 39    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 40    """
 41
 42    def __init__(
 43        self,
 44        eval_configs: List[EvalConfig],
 45        run_configs: List[TaskRunConfig] | None,
 46        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 47        save_context: SaveContext | None = None,
 48    ):
 49        if len(eval_configs) == 0:
 50            raise ValueError("Eval runner requires at least one eval config")
 51        target_eval = eval_configs[0].parent_eval()
 52        if target_eval is None:
 53            raise ValueError("Eval config requires a parent eval")
 54        for eval_config in eval_configs:
 55            parent_eval = eval_config.parent_eval()
 56            if parent_eval is None:
 57                raise ValueError("Eval config requires a parent eval")
 58            if parent_eval.id != target_eval.id:
 59                raise ValueError("All eval configs must have the same parent eval")
 60
 61        target_task = target_eval.parent_task()
 62        if target_task is None:
 63            raise ValueError("Eval config requires a (grand)parent task")
 64
 65        # Check that run_configs is compatible
 66        if eval_run_type == "task_run_eval":
 67            if run_configs is None or len(run_configs) == 0:
 68                raise ValueError("Task run eval requires run configs")
 69            for run_config in run_configs:
 70                parent_task = run_config.parent_task()
 71                if parent_task is None:
 72                    raise ValueError("All run configs must have a parent task")
 73                if parent_task.id != target_task.id:
 74                    raise ValueError(
 75                        "Run config is not for the same task as the eval configs"
 76                    )
 77        else:
 78            if run_configs is not None:
 79                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 80
 81        self.eval_run_type = eval_run_type
 82        self.eval_configs = eval_configs
 83        self.run_configs = run_configs
 84        self.task = target_task
 85        self.eval = target_eval
 86        self._skills: SkillsDict = self._preload_skills()
 87        self._save_context: SaveContext = save_context or default_save_context
 88
 89    def collect_tasks(self) -> List[EvalJob]:
 90        if self.eval_run_type == "eval_config_eval":
 91            if self.eval.eval_configs_filter_id is not None:
 92                return self.collect_tasks_for_eval_config_eval(
 93                    self.eval.eval_configs_filter_id
 94                )
 95            else:
 96                raise ValueError(
 97                    "Eval configs filter ID is required for eval runs of type 'eval_config_eval'"
 98                )
 99
100        else:
101            return self.collect_tasks_for_task_run_eval()
102
103    def collect_tasks_for_eval_config_eval(
104        self, eval_configs_filter_id: DatasetFilterId
105    ) -> List[EvalJob]:
106        """
107        Collect all jobs for this run, excluding any that have already been run.
108
109        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
110
111        The tasks:
112        - should be in the eval config set filter
113        - should not have already been run for this eval config + dataset item pair
114        """
115        filter = dataset_filter_from_id(eval_configs_filter_id)
116
117        # already_run[eval_config_id][dataset_id]
118        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
119        for eval_config in self.eval_configs:
120            already_run[eval_config.id] = set()
121            for run in eval_config.runs(readonly=True):
122                already_run[eval_config.id].add(run.dataset_id)
123
124        return [
125            EvalJob(
126                item=task_run,
127                eval_config=eval_config,
128                type="eval_config_eval",
129            )
130            for task_run in self.task.runs(readonly=True)
131            if filter(task_run)
132            for eval_config in self.eval_configs
133            if task_run.id not in already_run[eval_config.id]
134        ]
135
136    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
137        """
138        Collect all jobs for this run, excluding any that have already been run.
139
140        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
141
142        The tasks:
143        - should be in the eval set filter
144        - should not have already been run for this eval config + run config + dataset item
145        """
146        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
147
148        # already_run[eval_config_id][run_config_id][dataset_id]
149        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
150        for eval_config in self.eval_configs:
151            already_run[eval_config.id] = {}
152            for run_config in self.run_configs or []:
153                already_run[eval_config.id][run_config.id] = set()
154            for run in eval_config.runs(readonly=True):
155                if (
156                    run.task_run_config_id is not None
157                    and run.task_run_config_id in already_run[eval_config.id]
158                ):
159                    already_run[eval_config.id][run.task_run_config_id].add(
160                        run.dataset_id
161                    )
162
163        return [
164            EvalJob(
165                item=task_run,
166                task_run_config=run_config,
167                type="task_run_eval",
168                eval_config=eval_config,
169            )
170            for task_run in self.task.runs(readonly=True)
171            if filter(task_run)
172            for eval_config in self.eval_configs
173            for run_config in self.run_configs or []
174            if task_run.id not in already_run[eval_config.id][run_config.id]
175        ]
176
177    def _preload_skills(self) -> SkillsDict:
178        """Collect all skill IDs from run configs and bulk-load them once."""
179        if self.run_configs is None:
180            return {}
181        merged: SkillsDict = {}
182        for rc in self.run_configs:
183            skills = load_skills_for_task(self.task, rc.run_config_properties)
184            merged.update(skills)
185        return merged
186
187    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
188        """
189        Runs the configured eval run with parallel workers and yields progress updates.
190        """
191        jobs = self.collect_tasks()
192
193        runner = AsyncJobRunner(
194            concurrency=concurrency,
195            jobs=jobs,
196            run_job_fn=self.run_job,
197            max_retries=2,
198        )
199        async for progress in runner.run():
200            yield progress
201
202    async def run_job(self, job: EvalJob) -> bool:
203        try:
204            # Create the evaluator for this eval config/run config pair
205            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
206                job.eval_config,
207                job.task_run_config.run_config_properties
208                if job.task_run_config
209                else None,
210                skills=self._skills,
211            )
212            if not isinstance(evaluator, BaseEval):
213                raise ValueError("Not able to create evaluator from eval config")
214
215            task_output: str | None = None
216            reference_answer: str | None = None
217            trace: str | None = None
218            scores: EvalScores | None = None
219            intermediate_outputs: Dict[str, str] | None = None
220            task_run_usage: Usage | None = None
221            if job.type == "eval_config_eval":
222                # Eval config eval, we use the saved input from the task run, not invoking the task again
223                scores, intermediate_outputs = await evaluator.run_eval(job.item)
224                task_output = job.item.output.output
225                task_run_usage = job.item.usage
226            else:
227                # Task run eval, we invoke the task again to get a fresh output
228                (
229                    result_task_run,
230                    scores,
231                    intermediate_outputs,
232                ) = await evaluator.run_task_and_eval(job.item)
233                task_output = result_task_run.output.output
234                task_run_usage = result_task_run.usage
235
236                parent_eval = job.eval_config.parent_eval()
237                if (
238                    parent_eval
239                    and parent_eval.evaluation_data_type == EvalDataType.full_trace
240                    and result_task_run.trace
241                ):
242                    trace = json.dumps(result_task_run.trace, indent=2)
243
244                if (
245                    parent_eval
246                    and parent_eval.evaluation_data_type
247                    == EvalDataType.reference_answer
248                ):
249                    reference_answer = job.item.output.output
250
251            # Save the job result
252            async with self._save_context():
253                eval_run = EvalRun(
254                    parent=job.eval_config,
255                    task_run_config_id=job.task_run_config.id
256                    if job.task_run_config
257                    else None,
258                    dataset_id=job.item.id,
259                    eval_config_eval=job.type == "eval_config_eval",
260                    scores=scores,
261                    input=job.item.input,
262                    output=task_output,
263                    reference_answer=reference_answer,
264                    intermediate_outputs=intermediate_outputs,
265                    task_run_trace=trace,
266                    task_run_usage=task_run_usage,
267                )
268                eval_run.save_to_file()
269
270            return True
271        except Exception as e:
272            if _is_retryable_error(e):
273                logger.error(
274                    f"Transient error running eval job for dataset item {job.item.id}: {e}",
275                    exc_info=True,
276                )
277                raise RetryableError(str(e)) from e
278            logger.error(
279                f"Error running eval job for dataset item {job.item.id}: {e}",
280                exc_info=True,
281            )
282            raise

Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.

Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.

EvalRunner( eval_configs: List[kiln_ai.datamodel.eval.EvalConfig], run_configs: Optional[List[kiln_ai.datamodel.task.TaskRunConfig]], eval_run_type: Literal['eval_config_eval', 'task_run_eval'], save_context: Callable[[], contextlib.AbstractAsyncContextManager[None]] | None = None)
42    def __init__(
43        self,
44        eval_configs: List[EvalConfig],
45        run_configs: List[TaskRunConfig] | None,
46        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
47        save_context: SaveContext | None = None,
48    ):
49        if len(eval_configs) == 0:
50            raise ValueError("Eval runner requires at least one eval config")
51        target_eval = eval_configs[0].parent_eval()
52        if target_eval is None:
53            raise ValueError("Eval config requires a parent eval")
54        for eval_config in eval_configs:
55            parent_eval = eval_config.parent_eval()
56            if parent_eval is None:
57                raise ValueError("Eval config requires a parent eval")
58            if parent_eval.id != target_eval.id:
59                raise ValueError("All eval configs must have the same parent eval")
60
61        target_task = target_eval.parent_task()
62        if target_task is None:
63            raise ValueError("Eval config requires a (grand)parent task")
64
65        # Check that run_configs is compatible
66        if eval_run_type == "task_run_eval":
67            if run_configs is None or len(run_configs) == 0:
68                raise ValueError("Task run eval requires run configs")
69            for run_config in run_configs:
70                parent_task = run_config.parent_task()
71                if parent_task is None:
72                    raise ValueError("All run configs must have a parent task")
73                if parent_task.id != target_task.id:
74                    raise ValueError(
75                        "Run config is not for the same task as the eval configs"
76                    )
77        else:
78            if run_configs is not None:
79                raise ValueError("Mode 'eval_config_eval' does not support run configs")
80
81        self.eval_run_type = eval_run_type
82        self.eval_configs = eval_configs
83        self.run_configs = run_configs
84        self.task = target_task
85        self.eval = target_eval
86        self._skills: SkillsDict = self._preload_skills()
87        self._save_context: SaveContext = save_context or default_save_context
eval_run_type
eval_configs
run_configs
task
eval
def collect_tasks(self) -> List[EvalJob]:
 89    def collect_tasks(self) -> List[EvalJob]:
 90        if self.eval_run_type == "eval_config_eval":
 91            if self.eval.eval_configs_filter_id is not None:
 92                return self.collect_tasks_for_eval_config_eval(
 93                    self.eval.eval_configs_filter_id
 94                )
 95            else:
 96                raise ValueError(
 97                    "Eval configs filter ID is required for eval runs of type 'eval_config_eval'"
 98                )
 99
100        else:
101            return self.collect_tasks_for_task_run_eval()
def collect_tasks_for_eval_config_eval( self, eval_configs_filter_id: Annotated[str, AfterValidator(func=<function <lambda>>)]) -> List[EvalJob]:
103    def collect_tasks_for_eval_config_eval(
104        self, eval_configs_filter_id: DatasetFilterId
105    ) -> List[EvalJob]:
106        """
107        Collect all jobs for this run, excluding any that have already been run.
108
109        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
110
111        The tasks:
112        - should be in the eval config set filter
113        - should not have already been run for this eval config + dataset item pair
114        """
115        filter = dataset_filter_from_id(eval_configs_filter_id)
116
117        # already_run[eval_config_id][dataset_id]
118        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
119        for eval_config in self.eval_configs:
120            already_run[eval_config.id] = set()
121            for run in eval_config.runs(readonly=True):
122                already_run[eval_config.id].add(run.dataset_id)
123
124        return [
125            EvalJob(
126                item=task_run,
127                eval_config=eval_config,
128                type="eval_config_eval",
129            )
130            for task_run in self.task.runs(readonly=True)
131            if filter(task_run)
132            for eval_config in self.eval_configs
133            if task_run.id not in already_run[eval_config.id]
134        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).

The tasks:

  • should be in the eval config set filter
  • should not have already been run for this eval config + dataset item pair
def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
136    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
137        """
138        Collect all jobs for this run, excluding any that have already been run.
139
140        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
141
142        The tasks:
143        - should be in the eval set filter
144        - should not have already been run for this eval config + run config + dataset item
145        """
146        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
147
148        # already_run[eval_config_id][run_config_id][dataset_id]
149        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
150        for eval_config in self.eval_configs:
151            already_run[eval_config.id] = {}
152            for run_config in self.run_configs or []:
153                already_run[eval_config.id][run_config.id] = set()
154            for run in eval_config.runs(readonly=True):
155                if (
156                    run.task_run_config_id is not None
157                    and run.task_run_config_id in already_run[eval_config.id]
158                ):
159                    already_run[eval_config.id][run.task_run_config_id].add(
160                        run.dataset_id
161                    )
162
163        return [
164            EvalJob(
165                item=task_run,
166                task_run_config=run_config,
167                type="task_run_eval",
168                eval_config=eval_config,
169            )
170            for task_run in self.task.runs(readonly=True)
171            if filter(task_run)
172            for eval_config in self.eval_configs
173            for run_config in self.run_configs or []
174            if task_run.id not in already_run[eval_config.id][run_config.id]
175        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.

The tasks:

  • should be in the eval set filter
  • should not have already been run for this eval config + run config + dataset item
async def run( self, concurrency: int = 25) -> AsyncGenerator[kiln_ai.utils.async_job_runner.Progress, NoneType]:
187    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
188        """
189        Runs the configured eval run with parallel workers and yields progress updates.
190        """
191        jobs = self.collect_tasks()
192
193        runner = AsyncJobRunner(
194            concurrency=concurrency,
195            jobs=jobs,
196            run_job_fn=self.run_job,
197            max_retries=2,
198        )
199        async for progress in runner.run():
200            yield progress

Runs the configured eval run with parallel workers and yields progress updates.

async def run_job(self, job: EvalJob) -> bool:
202    async def run_job(self, job: EvalJob) -> bool:
203        try:
204            # Create the evaluator for this eval config/run config pair
205            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
206                job.eval_config,
207                job.task_run_config.run_config_properties
208                if job.task_run_config
209                else None,
210                skills=self._skills,
211            )
212            if not isinstance(evaluator, BaseEval):
213                raise ValueError("Not able to create evaluator from eval config")
214
215            task_output: str | None = None
216            reference_answer: str | None = None
217            trace: str | None = None
218            scores: EvalScores | None = None
219            intermediate_outputs: Dict[str, str] | None = None
220            task_run_usage: Usage | None = None
221            if job.type == "eval_config_eval":
222                # Eval config eval, we use the saved input from the task run, not invoking the task again
223                scores, intermediate_outputs = await evaluator.run_eval(job.item)
224                task_output = job.item.output.output
225                task_run_usage = job.item.usage
226            else:
227                # Task run eval, we invoke the task again to get a fresh output
228                (
229                    result_task_run,
230                    scores,
231                    intermediate_outputs,
232                ) = await evaluator.run_task_and_eval(job.item)
233                task_output = result_task_run.output.output
234                task_run_usage = result_task_run.usage
235
236                parent_eval = job.eval_config.parent_eval()
237                if (
238                    parent_eval
239                    and parent_eval.evaluation_data_type == EvalDataType.full_trace
240                    and result_task_run.trace
241                ):
242                    trace = json.dumps(result_task_run.trace, indent=2)
243
244                if (
245                    parent_eval
246                    and parent_eval.evaluation_data_type
247                    == EvalDataType.reference_answer
248                ):
249                    reference_answer = job.item.output.output
250
251            # Save the job result
252            async with self._save_context():
253                eval_run = EvalRun(
254                    parent=job.eval_config,
255                    task_run_config_id=job.task_run_config.id
256                    if job.task_run_config
257                    else None,
258                    dataset_id=job.item.id,
259                    eval_config_eval=job.type == "eval_config_eval",
260                    scores=scores,
261                    input=job.item.input,
262                    output=task_output,
263                    reference_answer=reference_answer,
264                    intermediate_outputs=intermediate_outputs,
265                    task_run_trace=trace,
266                    task_run_usage=task_run_usage,
267                )
268                eval_run.save_to_file()
269
270            return True
271        except Exception as e:
272            if _is_retryable_error(e):
273                logger.error(
274                    f"Transient error running eval job for dataset item {job.item.id}: {e}",
275                    exc_info=True,
276                )
277                raise RetryableError(str(e)) from e
278            logger.error(
279                f"Error running eval job for dataset item {job.item.id}: {e}",
280                exc_info=True,
281            )
282            raise