kiln_ai.adapters.eval.eval_runner

  1import json
  2import logging
  3from dataclasses import dataclass
  4from typing import AsyncGenerator, Dict, List, Literal, Set
  5
  6from kiln_ai.adapters.eval.base_eval import BaseEval
  7from kiln_ai.adapters.eval.registry import eval_adapter_from_type
  8from kiln_ai.datamodel.basemodel import ID_TYPE
  9from kiln_ai.datamodel.dataset_filters import DatasetFilterId, dataset_filter_from_id
 10from kiln_ai.datamodel.eval import EvalConfig, EvalDataType, EvalRun, EvalScores
 11from kiln_ai.datamodel.task import TaskRunConfig
 12from kiln_ai.datamodel.task_run import TaskRun, Usage
 13from kiln_ai.utils.async_job_runner import AsyncJobRunner, Progress
 14
 15logger = logging.getLogger(__name__)
 16
 17
 18@dataclass
 19class EvalJob:
 20    item: TaskRun
 21    type: Literal["task_run_eval", "eval_config_eval"]
 22    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
 23    eval_config: EvalConfig
 24    task_run_config: TaskRunConfig | None = None
 25
 26
 27class EvalRunner:
 28    """
 29    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 30
 31    Can run an eval in 2 modes:
 32    1) eval_config_eval: evaluate an eval config using existing dataset items.
 33    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 34    """
 35
 36    def __init__(
 37        self,
 38        eval_configs: List[EvalConfig],
 39        run_configs: List[TaskRunConfig] | None,
 40        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 41    ):
 42        if len(eval_configs) == 0:
 43            raise ValueError("Eval runner requires at least one eval config")
 44        target_eval = eval_configs[0].parent_eval()
 45        if target_eval is None:
 46            raise ValueError("Eval config requires a parent eval")
 47        for eval_config in eval_configs:
 48            parent_eval = eval_config.parent_eval()
 49            if parent_eval is None:
 50                raise ValueError("Eval config requires a parent eval")
 51            if parent_eval.id != target_eval.id:
 52                raise ValueError("All eval configs must have the same parent eval")
 53
 54        target_task = target_eval.parent_task()
 55        if target_task is None:
 56            raise ValueError("Eval config requires a (grand)parent task")
 57
 58        # Check that run_configs is compatible
 59        if eval_run_type == "task_run_eval":
 60            if run_configs is None or len(run_configs) == 0:
 61                raise ValueError("Task run eval requires run configs")
 62            for run_config in run_configs:
 63                parent_task = run_config.parent_task()
 64                if parent_task is None:
 65                    raise ValueError("All run configs must have a parent task")
 66                if parent_task.id != target_task.id:
 67                    raise ValueError(
 68                        "Run config is not for the same task as the eval configs"
 69                    )
 70        else:
 71            if run_configs is not None:
 72                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 73
 74        self.eval_run_type = eval_run_type
 75        self.eval_configs = eval_configs
 76        self.run_configs = run_configs
 77        self.task = target_task
 78        self.eval = target_eval
 79
 80    def collect_tasks(self) -> List[EvalJob]:
 81        if self.eval_run_type == "eval_config_eval":
 82            if self.eval.eval_configs_filter_id is not None:
 83                return self.collect_tasks_for_eval_config_eval(
 84                    self.eval.eval_configs_filter_id
 85                )
 86            else:
 87                raise ValueError(
 88                    "Eval configs filter ID is required for eval runs of type 'eval_config_eval'"
 89                )
 90
 91        else:
 92            return self.collect_tasks_for_task_run_eval()
 93
 94    def collect_tasks_for_eval_config_eval(
 95        self, eval_configs_filter_id: DatasetFilterId
 96    ) -> List[EvalJob]:
 97        """
 98        Collect all jobs for this run, excluding any that have already been run.
 99
100        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
101
102        The tasks:
103        - should be in the eval config set filter
104        - should not have already been run for this eval config + dataset item pair
105        """
106        filter = dataset_filter_from_id(eval_configs_filter_id)
107
108        # already_run[eval_config_id][dataset_id]
109        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
110        for eval_config in self.eval_configs:
111            already_run[eval_config.id] = set()
112            for run in eval_config.runs(readonly=True):
113                already_run[eval_config.id].add(run.dataset_id)
114
115        return [
116            EvalJob(
117                item=task_run,
118                eval_config=eval_config,
119                type="eval_config_eval",
120            )
121            for task_run in self.task.runs(readonly=True)
122            if filter(task_run)
123            for eval_config in self.eval_configs
124            if task_run.id not in already_run[eval_config.id]
125        ]
126
127    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
128        """
129        Collect all jobs for this run, excluding any that have already been run.
130
131        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
132
133        The tasks:
134        - should be in the eval set filter
135        - should not have already been run for this eval config + run config + dataset item
136        """
137        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
138
139        # already_run[eval_config_id][run_config_id][dataset_id]
140        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
141        for eval_config in self.eval_configs:
142            already_run[eval_config.id] = {}
143            for run_config in self.run_configs or []:
144                already_run[eval_config.id][run_config.id] = set()
145            for run in eval_config.runs(readonly=True):
146                if (
147                    run.task_run_config_id is not None
148                    and run.task_run_config_id in already_run[eval_config.id]
149                ):
150                    already_run[eval_config.id][run.task_run_config_id].add(
151                        run.dataset_id
152                    )
153
154        return [
155            EvalJob(
156                item=task_run,
157                task_run_config=run_config,
158                type="task_run_eval",
159                eval_config=eval_config,
160            )
161            for task_run in self.task.runs(readonly=True)
162            if filter(task_run)
163            for eval_config in self.eval_configs
164            for run_config in self.run_configs or []
165            if task_run.id not in already_run[eval_config.id][run_config.id]
166        ]
167
168    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
169        """
170        Runs the configured eval run with parallel workers and yields progress updates.
171        """
172        jobs = self.collect_tasks()
173
174        runner = AsyncJobRunner(
175            concurrency=concurrency,
176            jobs=jobs,
177            run_job_fn=self.run_job,
178        )
179        async for progress in runner.run():
180            yield progress
181
182    async def run_job(self, job: EvalJob) -> bool:
183        try:
184            # Create the evaluator for this eval config/run config pair
185            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
186                job.eval_config,
187                job.task_run_config.run_config_properties
188                if job.task_run_config
189                else None,
190            )
191            if not isinstance(evaluator, BaseEval):
192                raise ValueError("Not able to create evaluator from eval config")
193
194            task_output: str | None = None
195            reference_answer: str | None = None
196            trace: str | None = None
197            scores: EvalScores | None = None
198            intermediate_outputs: Dict[str, str] | None = None
199            task_run_usage: Usage | None = None
200            if job.type == "eval_config_eval":
201                # Eval config eval, we use the saved input from the task run, not invoking the task again
202                scores, intermediate_outputs = await evaluator.run_eval(job.item)
203                task_output = job.item.output.output
204                task_run_usage = job.item.usage
205            else:
206                # Task run eval, we invoke the task again to get a fresh output
207                (
208                    result_task_run,
209                    scores,
210                    intermediate_outputs,
211                ) = await evaluator.run_task_and_eval(job.item)
212                task_output = result_task_run.output.output
213                task_run_usage = result_task_run.usage
214
215                parent_eval = job.eval_config.parent_eval()
216                if (
217                    parent_eval
218                    and parent_eval.evaluation_data_type == EvalDataType.full_trace
219                    and result_task_run.trace
220                ):
221                    trace = json.dumps(result_task_run.trace, indent=2)
222
223                if (
224                    parent_eval
225                    and parent_eval.evaluation_data_type
226                    == EvalDataType.reference_answer
227                ):
228                    reference_answer = job.item.output.output
229
230            # Save the job result
231            eval_run = EvalRun(
232                parent=job.eval_config,
233                task_run_config_id=job.task_run_config.id
234                if job.task_run_config
235                else None,
236                dataset_id=job.item.id,
237                eval_config_eval=job.type == "eval_config_eval",
238                scores=scores,
239                input=job.item.input,
240                output=task_output,
241                reference_answer=reference_answer,
242                intermediate_outputs=intermediate_outputs,
243                task_run_trace=trace,
244                task_run_usage=task_run_usage,
245            )
246            eval_run.save_to_file()
247
248            return True
249        except Exception as e:
250            logger.error(
251                f"Error running eval job for dataset item {job.item.id}: {e}",
252                exc_info=True,
253            )
254            return False
logger = <Logger kiln_ai.adapters.eval.eval_runner (WARNING)>
@dataclass
class EvalJob:
19@dataclass
20class EvalJob:
21    item: TaskRun
22    type: Literal["task_run_eval", "eval_config_eval"]
23    # If type == "task_run_eval", both of these should be set. If type == "eval_config_eval", only eval_config should be set.
24    eval_config: EvalConfig
25    task_run_config: TaskRunConfig | None = None
EvalJob( item: kiln_ai.datamodel.TaskRun, type: Literal['task_run_eval', 'eval_config_eval'], eval_config: kiln_ai.datamodel.eval.EvalConfig, task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None)
type: Literal['task_run_eval', 'eval_config_eval']
task_run_config: kiln_ai.datamodel.task.TaskRunConfig | None = None
class EvalRunner:
 28class EvalRunner:
 29    """
 30    Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.
 31
 32    Can run an eval in 2 modes:
 33    1) eval_config_eval: evaluate an eval config using existing dataset items.
 34    2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.
 35    """
 36
 37    def __init__(
 38        self,
 39        eval_configs: List[EvalConfig],
 40        run_configs: List[TaskRunConfig] | None,
 41        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
 42    ):
 43        if len(eval_configs) == 0:
 44            raise ValueError("Eval runner requires at least one eval config")
 45        target_eval = eval_configs[0].parent_eval()
 46        if target_eval is None:
 47            raise ValueError("Eval config requires a parent eval")
 48        for eval_config in eval_configs:
 49            parent_eval = eval_config.parent_eval()
 50            if parent_eval is None:
 51                raise ValueError("Eval config requires a parent eval")
 52            if parent_eval.id != target_eval.id:
 53                raise ValueError("All eval configs must have the same parent eval")
 54
 55        target_task = target_eval.parent_task()
 56        if target_task is None:
 57            raise ValueError("Eval config requires a (grand)parent task")
 58
 59        # Check that run_configs is compatible
 60        if eval_run_type == "task_run_eval":
 61            if run_configs is None or len(run_configs) == 0:
 62                raise ValueError("Task run eval requires run configs")
 63            for run_config in run_configs:
 64                parent_task = run_config.parent_task()
 65                if parent_task is None:
 66                    raise ValueError("All run configs must have a parent task")
 67                if parent_task.id != target_task.id:
 68                    raise ValueError(
 69                        "Run config is not for the same task as the eval configs"
 70                    )
 71        else:
 72            if run_configs is not None:
 73                raise ValueError("Mode 'eval_config_eval' does not support run configs")
 74
 75        self.eval_run_type = eval_run_type
 76        self.eval_configs = eval_configs
 77        self.run_configs = run_configs
 78        self.task = target_task
 79        self.eval = target_eval
 80
 81    def collect_tasks(self) -> List[EvalJob]:
 82        if self.eval_run_type == "eval_config_eval":
 83            if self.eval.eval_configs_filter_id is not None:
 84                return self.collect_tasks_for_eval_config_eval(
 85                    self.eval.eval_configs_filter_id
 86                )
 87            else:
 88                raise ValueError(
 89                    "Eval configs filter ID is required for eval runs of type 'eval_config_eval'"
 90                )
 91
 92        else:
 93            return self.collect_tasks_for_task_run_eval()
 94
 95    def collect_tasks_for_eval_config_eval(
 96        self, eval_configs_filter_id: DatasetFilterId
 97    ) -> List[EvalJob]:
 98        """
 99        Collect all jobs for this run, excluding any that have already been run.
100
101        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
102
103        The tasks:
104        - should be in the eval config set filter
105        - should not have already been run for this eval config + dataset item pair
106        """
107        filter = dataset_filter_from_id(eval_configs_filter_id)
108
109        # already_run[eval_config_id][dataset_id]
110        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
111        for eval_config in self.eval_configs:
112            already_run[eval_config.id] = set()
113            for run in eval_config.runs(readonly=True):
114                already_run[eval_config.id].add(run.dataset_id)
115
116        return [
117            EvalJob(
118                item=task_run,
119                eval_config=eval_config,
120                type="eval_config_eval",
121            )
122            for task_run in self.task.runs(readonly=True)
123            if filter(task_run)
124            for eval_config in self.eval_configs
125            if task_run.id not in already_run[eval_config.id]
126        ]
127
128    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
129        """
130        Collect all jobs for this run, excluding any that have already been run.
131
132        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
133
134        The tasks:
135        - should be in the eval set filter
136        - should not have already been run for this eval config + run config + dataset item
137        """
138        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
139
140        # already_run[eval_config_id][run_config_id][dataset_id]
141        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
142        for eval_config in self.eval_configs:
143            already_run[eval_config.id] = {}
144            for run_config in self.run_configs or []:
145                already_run[eval_config.id][run_config.id] = set()
146            for run in eval_config.runs(readonly=True):
147                if (
148                    run.task_run_config_id is not None
149                    and run.task_run_config_id in already_run[eval_config.id]
150                ):
151                    already_run[eval_config.id][run.task_run_config_id].add(
152                        run.dataset_id
153                    )
154
155        return [
156            EvalJob(
157                item=task_run,
158                task_run_config=run_config,
159                type="task_run_eval",
160                eval_config=eval_config,
161            )
162            for task_run in self.task.runs(readonly=True)
163            if filter(task_run)
164            for eval_config in self.eval_configs
165            for run_config in self.run_configs or []
166            if task_run.id not in already_run[eval_config.id][run_config.id]
167        ]
168
169    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
170        """
171        Runs the configured eval run with parallel workers and yields progress updates.
172        """
173        jobs = self.collect_tasks()
174
175        runner = AsyncJobRunner(
176            concurrency=concurrency,
177            jobs=jobs,
178            run_job_fn=self.run_job,
179        )
180        async for progress in runner.run():
181            yield progress
182
183    async def run_job(self, job: EvalJob) -> bool:
184        try:
185            # Create the evaluator for this eval config/run config pair
186            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
187                job.eval_config,
188                job.task_run_config.run_config_properties
189                if job.task_run_config
190                else None,
191            )
192            if not isinstance(evaluator, BaseEval):
193                raise ValueError("Not able to create evaluator from eval config")
194
195            task_output: str | None = None
196            reference_answer: str | None = None
197            trace: str | None = None
198            scores: EvalScores | None = None
199            intermediate_outputs: Dict[str, str] | None = None
200            task_run_usage: Usage | None = None
201            if job.type == "eval_config_eval":
202                # Eval config eval, we use the saved input from the task run, not invoking the task again
203                scores, intermediate_outputs = await evaluator.run_eval(job.item)
204                task_output = job.item.output.output
205                task_run_usage = job.item.usage
206            else:
207                # Task run eval, we invoke the task again to get a fresh output
208                (
209                    result_task_run,
210                    scores,
211                    intermediate_outputs,
212                ) = await evaluator.run_task_and_eval(job.item)
213                task_output = result_task_run.output.output
214                task_run_usage = result_task_run.usage
215
216                parent_eval = job.eval_config.parent_eval()
217                if (
218                    parent_eval
219                    and parent_eval.evaluation_data_type == EvalDataType.full_trace
220                    and result_task_run.trace
221                ):
222                    trace = json.dumps(result_task_run.trace, indent=2)
223
224                if (
225                    parent_eval
226                    and parent_eval.evaluation_data_type
227                    == EvalDataType.reference_answer
228                ):
229                    reference_answer = job.item.output.output
230
231            # Save the job result
232            eval_run = EvalRun(
233                parent=job.eval_config,
234                task_run_config_id=job.task_run_config.id
235                if job.task_run_config
236                else None,
237                dataset_id=job.item.id,
238                eval_config_eval=job.type == "eval_config_eval",
239                scores=scores,
240                input=job.item.input,
241                output=task_output,
242                reference_answer=reference_answer,
243                intermediate_outputs=intermediate_outputs,
244                task_run_trace=trace,
245                task_run_usage=task_run_usage,
246            )
247            eval_run.save_to_file()
248
249            return True
250        except Exception as e:
251            logger.error(
252                f"Error running eval job for dataset item {job.item.id}: {e}",
253                exc_info=True,
254            )
255            return False

Runs an eval. Async execution is supported to make it faster when using remote/fast model providers.

Can run an eval in 2 modes: 1) eval_config_eval: evaluate an eval config using existing dataset items. 2) task_run_eval: evaluate a range of task run configs, generating new run output using existing dataset item input.

EvalRunner( eval_configs: List[kiln_ai.datamodel.eval.EvalConfig], run_configs: Optional[List[kiln_ai.datamodel.task.TaskRunConfig]], eval_run_type: Literal['eval_config_eval', 'task_run_eval'])
37    def __init__(
38        self,
39        eval_configs: List[EvalConfig],
40        run_configs: List[TaskRunConfig] | None,
41        eval_run_type: Literal["eval_config_eval", "task_run_eval"],
42    ):
43        if len(eval_configs) == 0:
44            raise ValueError("Eval runner requires at least one eval config")
45        target_eval = eval_configs[0].parent_eval()
46        if target_eval is None:
47            raise ValueError("Eval config requires a parent eval")
48        for eval_config in eval_configs:
49            parent_eval = eval_config.parent_eval()
50            if parent_eval is None:
51                raise ValueError("Eval config requires a parent eval")
52            if parent_eval.id != target_eval.id:
53                raise ValueError("All eval configs must have the same parent eval")
54
55        target_task = target_eval.parent_task()
56        if target_task is None:
57            raise ValueError("Eval config requires a (grand)parent task")
58
59        # Check that run_configs is compatible
60        if eval_run_type == "task_run_eval":
61            if run_configs is None or len(run_configs) == 0:
62                raise ValueError("Task run eval requires run configs")
63            for run_config in run_configs:
64                parent_task = run_config.parent_task()
65                if parent_task is None:
66                    raise ValueError("All run configs must have a parent task")
67                if parent_task.id != target_task.id:
68                    raise ValueError(
69                        "Run config is not for the same task as the eval configs"
70                    )
71        else:
72            if run_configs is not None:
73                raise ValueError("Mode 'eval_config_eval' does not support run configs")
74
75        self.eval_run_type = eval_run_type
76        self.eval_configs = eval_configs
77        self.run_configs = run_configs
78        self.task = target_task
79        self.eval = target_eval
eval_run_type
eval_configs
run_configs
task
eval
def collect_tasks(self) -> List[EvalJob]:
81    def collect_tasks(self) -> List[EvalJob]:
82        if self.eval_run_type == "eval_config_eval":
83            if self.eval.eval_configs_filter_id is not None:
84                return self.collect_tasks_for_eval_config_eval(
85                    self.eval.eval_configs_filter_id
86                )
87            else:
88                raise ValueError(
89                    "Eval configs filter ID is required for eval runs of type 'eval_config_eval'"
90                )
91
92        else:
93            return self.collect_tasks_for_task_run_eval()
def collect_tasks_for_eval_config_eval( self, eval_configs_filter_id: Annotated[str, AfterValidator(func=<function <lambda>>)]) -> List[EvalJob]:
 95    def collect_tasks_for_eval_config_eval(
 96        self, eval_configs_filter_id: DatasetFilterId
 97    ) -> List[EvalJob]:
 98        """
 99        Collect all jobs for this run, excluding any that have already been run.
100
101        This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).
102
103        The tasks:
104        - should be in the eval config set filter
105        - should not have already been run for this eval config + dataset item pair
106        """
107        filter = dataset_filter_from_id(eval_configs_filter_id)
108
109        # already_run[eval_config_id][dataset_id]
110        already_run: Dict[ID_TYPE, Set[ID_TYPE]] = {}
111        for eval_config in self.eval_configs:
112            already_run[eval_config.id] = set()
113            for run in eval_config.runs(readonly=True):
114                already_run[eval_config.id].add(run.dataset_id)
115
116        return [
117            EvalJob(
118                item=task_run,
119                eval_config=eval_config,
120                type="eval_config_eval",
121            )
122            for task_run in self.task.runs(readonly=True)
123            if filter(task_run)
124            for eval_config in self.eval_configs
125            if task_run.id not in already_run[eval_config.id]
126        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "eval_config_eval", using existing dataset run data (input/output).

The tasks:

  • should be in the eval config set filter
  • should not have already been run for this eval config + dataset item pair
def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
128    def collect_tasks_for_task_run_eval(self) -> List[EvalJob]:
129        """
130        Collect all jobs for this run, excluding any that have already been run.
131
132        This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.
133
134        The tasks:
135        - should be in the eval set filter
136        - should not have already been run for this eval config + run config + dataset item
137        """
138        filter = dataset_filter_from_id(self.eval.eval_set_filter_id)
139
140        # already_run[eval_config_id][run_config_id][dataset_id]
141        already_run: Dict[ID_TYPE, Dict[ID_TYPE, Set[ID_TYPE]]] = {}
142        for eval_config in self.eval_configs:
143            already_run[eval_config.id] = {}
144            for run_config in self.run_configs or []:
145                already_run[eval_config.id][run_config.id] = set()
146            for run in eval_config.runs(readonly=True):
147                if (
148                    run.task_run_config_id is not None
149                    and run.task_run_config_id in already_run[eval_config.id]
150                ):
151                    already_run[eval_config.id][run.task_run_config_id].add(
152                        run.dataset_id
153                    )
154
155        return [
156            EvalJob(
157                item=task_run,
158                task_run_config=run_config,
159                type="task_run_eval",
160                eval_config=eval_config,
161            )
162            for task_run in self.task.runs(readonly=True)
163            if filter(task_run)
164            for eval_config in self.eval_configs
165            for run_config in self.run_configs or []
166            if task_run.id not in already_run[eval_config.id][run_config.id]
167        ]

Collect all jobs for this run, excluding any that have already been run.

This variant is used for mode "task_run_eval", generating new run output using existing dataset item input.

The tasks:

  • should be in the eval set filter
  • should not have already been run for this eval config + run config + dataset item
async def run( self, concurrency: int = 25) -> AsyncGenerator[kiln_ai.utils.async_job_runner.Progress, NoneType]:
169    async def run(self, concurrency: int = 25) -> AsyncGenerator[Progress, None]:
170        """
171        Runs the configured eval run with parallel workers and yields progress updates.
172        """
173        jobs = self.collect_tasks()
174
175        runner = AsyncJobRunner(
176            concurrency=concurrency,
177            jobs=jobs,
178            run_job_fn=self.run_job,
179        )
180        async for progress in runner.run():
181            yield progress

Runs the configured eval run with parallel workers and yields progress updates.

async def run_job(self, job: EvalJob) -> bool:
183    async def run_job(self, job: EvalJob) -> bool:
184        try:
185            # Create the evaluator for this eval config/run config pair
186            evaluator = eval_adapter_from_type(job.eval_config.config_type)(
187                job.eval_config,
188                job.task_run_config.run_config_properties
189                if job.task_run_config
190                else None,
191            )
192            if not isinstance(evaluator, BaseEval):
193                raise ValueError("Not able to create evaluator from eval config")
194
195            task_output: str | None = None
196            reference_answer: str | None = None
197            trace: str | None = None
198            scores: EvalScores | None = None
199            intermediate_outputs: Dict[str, str] | None = None
200            task_run_usage: Usage | None = None
201            if job.type == "eval_config_eval":
202                # Eval config eval, we use the saved input from the task run, not invoking the task again
203                scores, intermediate_outputs = await evaluator.run_eval(job.item)
204                task_output = job.item.output.output
205                task_run_usage = job.item.usage
206            else:
207                # Task run eval, we invoke the task again to get a fresh output
208                (
209                    result_task_run,
210                    scores,
211                    intermediate_outputs,
212                ) = await evaluator.run_task_and_eval(job.item)
213                task_output = result_task_run.output.output
214                task_run_usage = result_task_run.usage
215
216                parent_eval = job.eval_config.parent_eval()
217                if (
218                    parent_eval
219                    and parent_eval.evaluation_data_type == EvalDataType.full_trace
220                    and result_task_run.trace
221                ):
222                    trace = json.dumps(result_task_run.trace, indent=2)
223
224                if (
225                    parent_eval
226                    and parent_eval.evaluation_data_type
227                    == EvalDataType.reference_answer
228                ):
229                    reference_answer = job.item.output.output
230
231            # Save the job result
232            eval_run = EvalRun(
233                parent=job.eval_config,
234                task_run_config_id=job.task_run_config.id
235                if job.task_run_config
236                else None,
237                dataset_id=job.item.id,
238                eval_config_eval=job.type == "eval_config_eval",
239                scores=scores,
240                input=job.item.input,
241                output=task_output,
242                reference_answer=reference_answer,
243                intermediate_outputs=intermediate_outputs,
244                task_run_trace=trace,
245                task_run_usage=task_run_usage,
246            )
247            eval_run.save_to_file()
248
249            return True
250        except Exception as e:
251            logger.error(
252                f"Error running eval job for dataset item {job.item.id}: {e}",
253                exc_info=True,
254            )
255            return False