kiln_ai.datamodel

See our docs for details about our datamodel classes and hierarchy:

Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html

User docs: https://docs.kiln.tech/developers/kiln-datamodel

 1"""
 2See our docs for details about our datamodel classes and hierarchy:
 3
 4Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html
 5
 6User docs: https://docs.kiln.tech/developers/kiln-datamodel
 7"""
 8
 9# This component uses "flat" imports so we don't have too much internal structure exposed in the API.
10# for example you can just `from datamodel import Task, Project` instead of `from datamodel.task import Task; from datamodel.project import Project`
11
12from __future__ import annotations
13
14from kiln_ai.datamodel import (
15    chunk,
16    dataset_split,
17    embedding,
18    eval,
19    extraction,
20    rag,
21    reranker,
22    strict_mode,
23)
24from kiln_ai.datamodel.basemodel import generate_model_id
25from kiln_ai.datamodel.datamodel_enums import (
26    FeedbackSource,
27    FineTuneStatusType,
28    Priority,
29    StructuredOutputMode,
30    TaskOutputRatingType,
31)
32from kiln_ai.datamodel.dataset_split import DatasetSplit, DatasetSplitDefinition
33from kiln_ai.datamodel.external_tool_server import ExternalToolServer
34from kiln_ai.datamodel.feedback import Feedback
35from kiln_ai.datamodel.finetune import Finetune
36from kiln_ai.datamodel.project import Project
37from kiln_ai.datamodel.prompt import BasePrompt, Prompt
38from kiln_ai.datamodel.prompt_id import (
39    PromptGenerators,
40    PromptId,
41    prompt_generator_values,
42)
43from kiln_ai.datamodel.prompt_optimization_job import PromptOptimizationJob
44from kiln_ai.datamodel.skill import Skill
45from kiln_ai.datamodel.task import Task, TaskRequirement
46from kiln_ai.datamodel.task_output import (
47    DataSource,
48    DataSourceProperty,
49    DataSourceType,
50    RequirementRating,
51    TaskOutput,
52    TaskOutputRating,
53)
54from kiln_ai.datamodel.task_run import TaskRun, Usage
55
56__all__ = [
57    "BasePrompt",
58    "DataSource",
59    "DataSourceProperty",
60    "DataSourceType",
61    "DatasetSplit",
62    "DatasetSplitDefinition",
63    "ExternalToolServer",
64    "Feedback",
65    "FeedbackSource",
66    "FineTuneStatusType",
67    "Finetune",
68    "Priority",
69    "Project",
70    "Prompt",
71    "PromptGenerators",
72    "PromptId",
73    "PromptOptimizationJob",
74    "RequirementRating",
75    "Skill",
76    "StructuredOutputMode",
77    "Task",
78    "TaskOutput",
79    "TaskOutputRating",
80    "TaskOutputRatingType",
81    "TaskRequirement",
82    "TaskRun",
83    "Usage",
84    "chunk",
85    "dataset_split",
86    "embedding",
87    "eval",
88    "extraction",
89    "generate_model_id",
90    "prompt_generator_values",
91    "rag",
92    "reranker",
93    "strict_mode",
94]
class BasePrompt(pydantic.main.BaseModel):
 7class BasePrompt(BaseModel):
 8    """
 9    A prompt for a task. This is the basic data storage format which can be used throughout a project.
10
11    The "Prompt" model name is reserved for the custom prompts parented by a task.
12    """
13
14    name: FilenameString = Field(description="The name of the prompt.")
15    description: str | None = Field(
16        default=None,
17        description="A more detailed description of the prompt.",
18    )
19    generator_id: str | None = Field(
20        default=None,
21        description="The id of the generator that created this prompt.",
22    )
23    prompt: str = Field(
24        description="The prompt for the task.",
25        min_length=1,
26    )
27    chain_of_thought_instructions: str | None = Field(
28        default=None,
29        description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting. COT will not be used unless this is provided.",
30    )

A prompt for a task. This is the basic data storage format which can be used throughout a project.

The "Prompt" model name is reserved for the custom prompts parented by a task.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
generator_id: str | None
prompt: str
chain_of_thought_instructions: str | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataSource(pydantic.main.BaseModel):
195class DataSource(BaseModel):
196    """
197    Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.
198
199    Properties vary based on the source type - for synthetic/tool_call sources this includes
200    model information, for human sources this includes creator information, for file imports
201    this includes file information.
202    """
203
204    type: DataSourceType = Field(description="The type of data source.")
205    properties: Dict[str, str | int | float] = Field(
206        default={},
207        description="Properties describing the data source. For synthetic things like model. For human: the human's name. For file_import: file information.",
208    )
209    run_config: Optional[RunConfigProperties] = Field(
210        default=None,
211        description="The run config used to generate the data, if generated by a running a model in Kiln (only true for type=synthetic).",
212    )
213
214    _data_source_properties = [
215        DataSourceProperty(
216            name="created_by",
217            type=str,
218            required_for=[DataSourceType.human],
219            not_allowed_for=[
220                DataSourceType.synthetic,
221                DataSourceType.file_import,
222                DataSourceType.tool_call,
223            ],
224        ),
225        DataSourceProperty(
226            name="model_name",
227            type=str,
228            required_for=[DataSourceType.synthetic],
229            not_allowed_for=[
230                DataSourceType.human,
231                DataSourceType.file_import,
232                DataSourceType.tool_call,
233            ],
234        ),
235        DataSourceProperty(
236            name="model_provider",
237            type=str,
238            required_for=[DataSourceType.synthetic],
239            not_allowed_for=[
240                DataSourceType.human,
241                DataSourceType.file_import,
242                DataSourceType.tool_call,
243            ],
244        ),
245        DataSourceProperty(
246            name="adapter_name",
247            type=str,
248            required_for=[DataSourceType.synthetic],
249            not_allowed_for=[
250                DataSourceType.human,
251                DataSourceType.file_import,
252                DataSourceType.tool_call,
253            ],
254        ),
255        DataSourceProperty(
256            # Legacy field -- allow loading from old runs, but we shouldn't be setting it.
257            name="prompt_builder_name",
258            type=str,
259            not_allowed_for=[
260                DataSourceType.human,
261                DataSourceType.file_import,
262                DataSourceType.tool_call,
263            ],
264        ),
265        DataSourceProperty(
266            # The PromptId of the prompt. Can be a saved prompt, fine-tune, generator name, etc. See PromptId type for more details.
267            name="prompt_id",
268            type=str,
269            not_allowed_for=[
270                DataSourceType.human,
271                DataSourceType.file_import,
272                DataSourceType.tool_call,
273            ],
274        ),
275        DataSourceProperty(
276            name="file_name",
277            type=str,
278            required_for=[DataSourceType.file_import],
279            not_allowed_for=[
280                DataSourceType.human,
281                DataSourceType.synthetic,
282                DataSourceType.tool_call,
283            ],
284        ),
285    ]
286
287    @model_validator(mode="after")
288    def validate_type(self) -> "DataSource":
289        if self.type not in DataSourceType:
290            raise ValueError(f"Invalid data source type: {self.type}")
291        return self
292
293    @model_validator(mode="after")
294    def validate_properties(self) -> "DataSource":
295        for prop in self._data_source_properties:
296            # Check the property type is correct
297            if prop.name in self.properties:
298                if not isinstance(self.properties[prop.name], prop.type):
299                    raise ValueError(
300                        f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source"
301                    )
302            # Check the property is required for the data source type
303            if self.type in prop.required_for:
304                if prop.name not in self.properties:
305                    raise ValueError(
306                        f"'{prop.name}' is required for {self.type} data source"
307                    )
308            # Check the property is not allowed for the data source type
309            elif self.type in prop.not_allowed_for and prop.name in self.properties:
310                raise ValueError(
311                    f"'{prop.name}' is not allowed for {self.type} data source"
312                )
313        return self
314
315    @model_validator(mode="after")
316    def validate_no_empty_properties(self) -> Self:
317        for prop, value in self.properties.items():
318            if isinstance(value, str) and value == "":
319                raise ValueError(
320                    f"Property '{prop}' must be a non-empty string for {self.type} data source"
321                )
322        return self

Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.

Properties vary based on the source type - for synthetic/tool_call sources this includes model information, for human sources this includes creator information, for file imports this includes file information.

properties: Dict[str, str | int | float]
run_config: Optional[Annotated[Union[Annotated[kiln_ai.datamodel.run_config.KilnAgentRunConfigProperties, Tag(tag='kiln_agent')], Annotated[kiln_ai.datamodel.run_config.McpRunConfigProperties, Tag(tag='mcp')]], Discriminator(discriminator=<function _get_run_config_type at 0x7f902336c720>, custom_error_type=None, custom_error_message=None, custom_error_context=None)]]
@model_validator(mode='after')
def validate_type(self) -> DataSource:
287    @model_validator(mode="after")
288    def validate_type(self) -> "DataSource":
289        if self.type not in DataSourceType:
290            raise ValueError(f"Invalid data source type: {self.type}")
291        return self
@model_validator(mode='after')
def validate_properties(self) -> DataSource:
293    @model_validator(mode="after")
294    def validate_properties(self) -> "DataSource":
295        for prop in self._data_source_properties:
296            # Check the property type is correct
297            if prop.name in self.properties:
298                if not isinstance(self.properties[prop.name], prop.type):
299                    raise ValueError(
300                        f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source"
301                    )
302            # Check the property is required for the data source type
303            if self.type in prop.required_for:
304                if prop.name not in self.properties:
305                    raise ValueError(
306                        f"'{prop.name}' is required for {self.type} data source"
307                    )
308            # Check the property is not allowed for the data source type
309            elif self.type in prop.not_allowed_for and prop.name in self.properties:
310                raise ValueError(
311                    f"'{prop.name}' is not allowed for {self.type} data source"
312                )
313        return self
@model_validator(mode='after')
def validate_no_empty_properties(self) -> Self:
315    @model_validator(mode="after")
316    def validate_no_empty_properties(self) -> Self:
317        for prop, value in self.properties.items():
318            if isinstance(value, str) and value == "":
319                raise ValueError(
320                    f"Property '{prop}' must be a non-empty string for {self.type} data source"
321                )
322        return self
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class DataSourceProperty(pydantic.main.BaseModel):
181class DataSourceProperty(BaseModel):
182    """
183    Defines a property that can be associated with a data source.
184
185    Includes validation rules for when properties are required or not allowed
186    based on the data source type.
187    """
188
189    name: str
190    type: Type[Union[str, int, float]]
191    required_for: List[DataSourceType] = []
192    not_allowed_for: List[DataSourceType] = []

Defines a property that can be associated with a data source.

Includes validation rules for when properties are required or not allowed based on the data source type.

name: str
type: Type[Union[str, int, float]]
required_for: List[DataSourceType]
not_allowed_for: List[DataSourceType]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataSourceType(builtins.str, enum.Enum):
167class DataSourceType(str, Enum):
168    """
169    The source type of a piece of data.
170
171    Human: a human created the data
172    Synthetic: a model created the data
173    """
174
175    human = "human"
176    synthetic = "synthetic"
177    file_import = "file_import"
178    tool_call = "tool_call"

The source type of a piece of data.

Human: a human created the data Synthetic: a model created the data

human = <DataSourceType.human: 'human'>
synthetic = <DataSourceType.synthetic: 'synthetic'>
file_import = <DataSourceType.file_import: 'file_import'>
tool_call = <DataSourceType.tool_call: 'tool_call'>
class DatasetSplit(kiln_ai.datamodel.basemodel.KilnParentedModel):
 83class DatasetSplit(KilnParentedModel):
 84    """
 85    A collection of task runs, with optional splits (train, test, validation).
 86
 87    Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.
 88
 89    Maintains a list of IDs for each split, to avoid data duplication.
 90    """
 91
 92    name: FilenameString = Field(description="The name of the dataset split.")
 93    description: str | None = Field(
 94        default=None,
 95        description="A description of the dataset for you and your team. Not used in training.",
 96    )
 97    splits: list[DatasetSplitDefinition] = Field(
 98        default_factory=list,
 99        description="The splits in the dataset.",
100    )
101    split_contents: dict[str, list[str]] = Field(
102        description="The contents of each split in the dataset. The key is the split name, and the value is a list of task run IDs.",
103    )
104    filter: DatasetFilterId | None = Field(
105        default=None,
106        description="The filter used to build the dataset.",
107    )
108
109    @model_validator(mode="after")
110    def validate_split_percentages(self) -> "DatasetSplit":
111        total = sum(split.percentage for split in self.splits)
112        if not math.isclose(total, 1.0, rel_tol=1e-9):
113            raise ValueError(f"The sum of split percentages must be 1.0 (got {total})")
114        return self
115
116    @classmethod
117    def from_task(
118        cls,
119        name: str,
120        task: "Task",
121        splits: list[DatasetSplitDefinition],
122        filter_id: DatasetFilterId = "all",
123        description: str | None = None,
124    ):
125        """
126        Build a dataset split from a task.
127        """
128        filter = dataset_filter_from_id(filter_id)
129        split_contents = cls.build_split_contents(task, splits, filter)
130        return cls(
131            parent=task,
132            name=name,
133            description=description,
134            splits=splits,
135            split_contents=split_contents,
136            filter=filter_id,
137        )
138
139    @classmethod
140    def build_split_contents(
141        cls,
142        task: "Task",
143        splits: list[DatasetSplitDefinition],
144        filter: DatasetFilter,
145    ) -> dict[str, list[str]]:
146        valid_ids = []
147        for task_run in task.runs():
148            if filter(task_run):
149                valid_ids.append(task_run.id)
150
151        # Shuffle and split by split percentage
152        random.shuffle(valid_ids)
153        split_contents = {}
154        start_idx = 0
155        remaining_items = len(valid_ids)
156
157        # Handle all splits except the last one
158        for split in splits[:-1]:
159            split_size = round(len(valid_ids) * split.percentage)
160            split_contents[split.name] = valid_ids[start_idx : start_idx + split_size]
161            start_idx += split_size
162            remaining_items -= split_size
163
164        # Last split gets all remaining items (for rounding)
165        if splits:
166            split_contents[splits[-1].name] = valid_ids[start_idx:]
167
168        return split_contents
169
170    def parent_task(self) -> "Task | None":
171        # inline import to avoid circular import
172        from kiln_ai.datamodel import Task
173
174        if not isinstance(self.parent, Task):
175            return None
176        return self.parent
177
178    def missing_count(self) -> int:
179        """
180        Returns:
181            int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
182        """
183        parent = self.parent_task()
184        if parent is None:
185            raise ValueError("DatasetSplit has no parent task")
186
187        runs = parent.runs(readonly=True)
188        all_ids = set(run.id for run in runs)
189        all_ids_in_splits = set()
190        for ids in self.split_contents.values():
191            all_ids_in_splits.update(ids)
192        missing = all_ids_in_splits - all_ids
193        return len(missing)
194
195    def _get_runs(self) -> list[TaskRun]:
196        """
197        Get all task runs referenced in this dataset split.
198
199        Returns:
200            list[TaskRun]: list of task runs in this dataset split
201        """
202        parent = self.parent_task()
203        if parent is None:
204            return []
205
206        runs = []
207        all_run_ids = set()
208        for run_ids in self.split_contents.values():
209            all_run_ids.update(run_ids)
210
211        # Find all runs by their IDs
212        for task_run in parent.runs(readonly=True):
213            if task_run.id in all_run_ids:
214                runs.append(task_run)
215
216        return runs
217
218    @staticmethod
219    def compute_tool_info(runs: list[TaskRun]) -> DatasetToolInfo:
220        """
221        Compute tool info from a list of task runs.
222
223        Args:
224            runs: list of task runs to analyze
225
226        Returns:
227            DatasetToolInfo: information about tools used across the task runs
228        """
229
230        has_tool_mismatch = False
231        tools: set[str] | None = None
232
233        for run in runs:
234            # Extract tools from run config, treating missing source/run_config/tools_config as empty tools
235            run_tools: set[str] = set()
236            source = run.output.source if run.output else None
237            if source is not None and isinstance(
238                source.run_config, KilnAgentRunConfigProperties
239            ):
240                tools_config = source.run_config.tools_config
241                if tools_config is not None:
242                    run_tools = set(tools_config.tools)
243
244            # First run establishes the expected tool set (including empty)
245            if tools is None:
246                tools = run_tools
247            elif run_tools != tools:
248                # Mismatch found
249                has_tool_mismatch = True
250                tools = None
251                break
252
253        # If no valid runs were processed, return empty tools
254        if tools is None:
255            if not has_tool_mismatch:
256                tools = set()
257
258        return DatasetToolInfo(
259            has_tool_mismatch=has_tool_mismatch,
260            tools=None if tools is None else sorted(tools),
261        )
262
263    def tool_info(self) -> DatasetToolInfo:
264        """
265        Helper method to compute tool info for the dataset split. Iterate through all runs in the dataset split and check the tools used in each run config.
266
267        Returns:
268            DatasetToolInfo: information about tools used across task runs in this dataset split
269        """
270        runs = self._get_runs()
271        tool_info = self.compute_tool_info(runs)
272        return tool_info

A collection of task runs, with optional splits (train, test, validation).

Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.

Maintains a list of IDs for each split, to avoid data duplication.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
splits: list[DatasetSplitDefinition]
split_contents: dict[str, list[str]]
filter: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7f902336d760>)]]
@model_validator(mode='after')
def validate_split_percentages(self) -> DatasetSplit:
109    @model_validator(mode="after")
110    def validate_split_percentages(self) -> "DatasetSplit":
111        total = sum(split.percentage for split in self.splits)
112        if not math.isclose(total, 1.0, rel_tol=1e-9):
113            raise ValueError(f"The sum of split percentages must be 1.0 (got {total})")
114        return self
@classmethod
def from_task( cls, name: str, task: Task, splits: list[DatasetSplitDefinition], filter_id: Annotated[str, AfterValidator(func=<function <lambda>>)] = 'all', description: str | None = None):
116    @classmethod
117    def from_task(
118        cls,
119        name: str,
120        task: "Task",
121        splits: list[DatasetSplitDefinition],
122        filter_id: DatasetFilterId = "all",
123        description: str | None = None,
124    ):
125        """
126        Build a dataset split from a task.
127        """
128        filter = dataset_filter_from_id(filter_id)
129        split_contents = cls.build_split_contents(task, splits, filter)
130        return cls(
131            parent=task,
132            name=name,
133            description=description,
134            splits=splits,
135            split_contents=split_contents,
136            filter=filter_id,
137        )

Build a dataset split from a task.

@classmethod
def build_split_contents( cls, task: Task, splits: list[DatasetSplitDefinition], filter: kiln_ai.datamodel.dataset_filters.DatasetFilter) -> dict[str, list[str]]:
139    @classmethod
140    def build_split_contents(
141        cls,
142        task: "Task",
143        splits: list[DatasetSplitDefinition],
144        filter: DatasetFilter,
145    ) -> dict[str, list[str]]:
146        valid_ids = []
147        for task_run in task.runs():
148            if filter(task_run):
149                valid_ids.append(task_run.id)
150
151        # Shuffle and split by split percentage
152        random.shuffle(valid_ids)
153        split_contents = {}
154        start_idx = 0
155        remaining_items = len(valid_ids)
156
157        # Handle all splits except the last one
158        for split in splits[:-1]:
159            split_size = round(len(valid_ids) * split.percentage)
160            split_contents[split.name] = valid_ids[start_idx : start_idx + split_size]
161            start_idx += split_size
162            remaining_items -= split_size
163
164        # Last split gets all remaining items (for rounding)
165        if splits:
166            split_contents[splits[-1].name] = valid_ids[start_idx:]
167
168        return split_contents
def parent_task(self) -> Task | None:
170    def parent_task(self) -> "Task | None":
171        # inline import to avoid circular import
172        from kiln_ai.datamodel import Task
173
174        if not isinstance(self.parent, Task):
175            return None
176        return self.parent
def missing_count(self) -> int:
178    def missing_count(self) -> int:
179        """
180        Returns:
181            int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
182        """
183        parent = self.parent_task()
184        if parent is None:
185            raise ValueError("DatasetSplit has no parent task")
186
187        runs = parent.runs(readonly=True)
188        all_ids = set(run.id for run in runs)
189        all_ids_in_splits = set()
190        for ids in self.split_contents.values():
191            all_ids_in_splits.update(ids)
192        missing = all_ids_in_splits - all_ids
193        return len(missing)

Returns: int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset

@staticmethod
def compute_tool_info( runs: list[TaskRun]) -> kiln_ai.datamodel.dataset_split.DatasetToolInfo:
218    @staticmethod
219    def compute_tool_info(runs: list[TaskRun]) -> DatasetToolInfo:
220        """
221        Compute tool info from a list of task runs.
222
223        Args:
224            runs: list of task runs to analyze
225
226        Returns:
227            DatasetToolInfo: information about tools used across the task runs
228        """
229
230        has_tool_mismatch = False
231        tools: set[str] | None = None
232
233        for run in runs:
234            # Extract tools from run config, treating missing source/run_config/tools_config as empty tools
235            run_tools: set[str] = set()
236            source = run.output.source if run.output else None
237            if source is not None and isinstance(
238                source.run_config, KilnAgentRunConfigProperties
239            ):
240                tools_config = source.run_config.tools_config
241                if tools_config is not None:
242                    run_tools = set(tools_config.tools)
243
244            # First run establishes the expected tool set (including empty)
245            if tools is None:
246                tools = run_tools
247            elif run_tools != tools:
248                # Mismatch found
249                has_tool_mismatch = True
250                tools = None
251                break
252
253        # If no valid runs were processed, return empty tools
254        if tools is None:
255            if not has_tool_mismatch:
256                tools = set()
257
258        return DatasetToolInfo(
259            has_tool_mismatch=has_tool_mismatch,
260            tools=None if tools is None else sorted(tools),
261        )

Compute tool info from a list of task runs.

Args: runs: list of task runs to analyze

Returns: DatasetToolInfo: information about tools used across the task runs

def tool_info(self) -> kiln_ai.datamodel.dataset_split.DatasetToolInfo:
263    def tool_info(self) -> DatasetToolInfo:
264        """
265        Helper method to compute tool info for the dataset split. Iterate through all runs in the dataset split and check the tools used in each run config.
266
267        Returns:
268            DatasetToolInfo: information about tools used across task runs in this dataset split
269        """
270        runs = self._get_runs()
271        tool_info = self.compute_tool_info(runs)
272        return tool_info

Helper method to compute tool info for the dataset split. Iterate through all runs in the dataset split and check the tools used in each run config.

Returns: DatasetToolInfo: information about tools used across task runs in this dataset split

def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class DatasetSplitDefinition(pydantic.main.BaseModel):
39class DatasetSplitDefinition(BaseModel):
40    """
41    A definition of a split in a dataset.
42
43    Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)
44    """
45
46    name: FilenameString = Field(
47        description="The name of the dataset split definition."
48    )
49    description: str | None = Field(
50        default=None,
51        description="A description of the dataset for you and your team. Not used in training.",
52    )
53    percentage: float = Field(
54        ge=0.0,
55        le=1.0,
56        description="The percentage of the dataset that this split represents (between 0 and 1).",
57    )

A definition of a split in a dataset.

Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
percentage: float
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ExternalToolServer(kiln_ai.datamodel.basemodel.KilnParentedModel):
 64class ExternalToolServer(KilnParentedModel):
 65    """
 66    Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.
 67
 68    This model stores the necessary configuration to connect to and authenticate with
 69    external MCP servers that provide tools for LLM interactions.
 70    """
 71
 72    name: FilenameString = Field(description="The name of the external tool.")
 73    type: ToolServerType = Field(
 74        description="The type of external tool server. Remote tools are hosted on a remote server",
 75    )
 76    description: str | None = Field(
 77        default=None,
 78        description="A description of the external tool for you and your team. Will not be used in prompts/training/validation.",
 79    )
 80
 81    properties: (
 82        LocalServerProperties | RemoteServerProperties | KilnTaskServerProperties
 83    ) = Field(
 84        description="Configuration properties specific to the tool type.",
 85    )
 86
 87    # Private variable to store unsaved secrets
 88    _unsaved_secrets: dict[str, str] = PrivateAttr(default_factory=dict)
 89
 90    def model_post_init(self, __context: Any) -> None:
 91        # Process secrets after initialization (pydantic v2 hook)
 92        self._process_secrets_from_properties()
 93
 94    def _process_secrets_from_properties(self) -> None:
 95        """
 96        Extract secrets from properties and move them to _unsaved_secrets.
 97        This removes secrets from the properties dict so they aren't saved to file.
 98        Clears existing _unsaved_secrets first to handle property updates correctly.
 99        """
100        # Clear existing unsaved secrets since we're reprocessing
101        self._unsaved_secrets.clear()
102
103        secret_keys = self.get_secret_keys()
104
105        if not secret_keys:
106            return
107
108        # Extract secret values from properties based on server type
109        match self.type:
110            case ToolServerType.remote_mcp:
111                headers = self.properties.get("headers", {})
112                for key_name in secret_keys:
113                    if key_name in headers:
114                        self._unsaved_secrets[key_name] = headers[key_name]
115                        # Remove from headers immediately so they are not saved to file
116                        del headers[key_name]
117
118            case ToolServerType.local_mcp:
119                env_vars = self.properties.get("env_vars", {})
120                for key_name in secret_keys:
121                    if key_name in env_vars:
122                        self._unsaved_secrets[key_name] = env_vars[key_name]
123                        # Remove from env_vars immediately so they are not saved to file
124                        del env_vars[key_name]
125
126            case ToolServerType.kiln_task:
127                pass
128
129            case _:
130                raise_exhaustive_enum_error(self.type)
131
132    def __setattr__(self, name: str, value: Any) -> None:
133        """
134        Override __setattr__ to process secrets whenever properties are updated.
135        """
136        super().__setattr__(name, value)
137
138        # Process secrets whenever properties are updated
139        if name == "properties":
140            self._process_secrets_from_properties()
141
142    # Validation Helpers
143
144    @classmethod
145    def check_server_url(cls, server_url: str) -> None:
146        """Validate Server URL"""
147        if not isinstance(server_url, str):
148            raise ValueError("Server URL must be a string")
149
150        # Check for leading whitespace in URL
151        if server_url != server_url.lstrip():
152            raise ValueError("Server URL must not have leading whitespace")
153
154        parsed_url = urlparse(server_url)
155        if not parsed_url.netloc:
156            raise ValueError("Server URL is not a valid URL")
157        if parsed_url.scheme not in ["http", "https"]:
158            raise ValueError("Server URL must start with http:// or https://")
159
160    @classmethod
161    def check_headers(cls, headers: dict) -> None:
162        """Validate Headers"""
163        if not isinstance(headers, dict):
164            raise ValueError("headers must be a dictionary")
165
166        for key, value in headers.items():
167            if not key:
168                raise ValueError("Header name is required")
169            if not value:
170                raise ValueError("Header value is required")
171
172            # Reject invalid header names and CR/LF in names/values
173            token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$")
174            if not token_re.match(key):
175                raise ValueError(f'Invalid header name: "{key}"')
176            if re.search(r"\r|\n", key) or re.search(r"\r|\n", value):
177                raise ValueError(
178                    "Header names/values must not contain invalid characters"
179                )
180
181    @classmethod
182    def check_secret_keys(
183        cls, secret_keys: list, key_type: str, tool_type: str
184    ) -> None:
185        """Validate Secret Keys (generic method for both header and env var keys)"""
186        if not isinstance(secret_keys, list):
187            raise ValueError(
188                f"{key_type} must be a list for external tools of type '{tool_type}'"
189            )
190        if not all(isinstance(k, str) for k in secret_keys):
191            raise ValueError(f"{key_type} must contain only strings")
192        if not all(key for key in secret_keys):
193            raise ValueError("Secret key is required")
194
195    @classmethod
196    def check_env_vars(cls, env_vars: dict) -> None:
197        """Validate Environment Variables"""
198        if not isinstance(env_vars, dict):
199            raise ValueError("environment variables must be a dictionary")
200
201        # Validate env_vars keys are in the correct format for Environment Variables
202        # According to POSIX specification, environment variable names must:
203        # - Start with a letter (a-z, A-Z) or underscore (_)
204        # - Contain only ASCII letters, digits, and underscores
205        for key, _ in env_vars.items():
206            if not key or not (
207                key[0].isascii() and (key[0].isalpha() or key[0] == "_")
208            ):
209                raise ValueError(
210                    f"Invalid environment variable key: {key}. Must start with a letter or underscore."
211                )
212
213            if not all(c.isascii() and (c.isalnum() or c == "_") for c in key):
214                raise ValueError(
215                    f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores."
216                )
217
218    @classmethod
219    def type_from_data(cls, data: dict) -> ToolServerType:
220        """Get the tool server type from the data for the the validators"""
221        raw_type = data.get("type")
222        if raw_type is None:
223            raise ValueError("type is required")
224        try:
225            return ToolServerType(raw_type)
226        except ValueError:
227            valid_types = ", ".join(type.value for type in ToolServerType)
228            raise ValueError(f"type must be one of: {valid_types}")
229
230    @model_validator(mode="before")
231    def upgrade_old_properties(cls, data: dict) -> dict:
232        """
233        Upgrade properties for backwards compatibility.
234        """
235        properties = data.get("properties")
236        if properties is not None and "is_archived" not in properties:
237            # Add is_archived field with default value back to data
238            properties["is_archived"] = False
239            data["properties"] = properties
240        return data
241
242    @model_validator(mode="before")
243    def validate_required_fields(cls, data: dict) -> dict:
244        """Validate that each tool type has the required configuration."""
245        server_type = ExternalToolServer.type_from_data(data)
246        properties = data.get("properties", {})
247
248        match server_type:
249            case ToolServerType.remote_mcp:
250                server_url = properties.get("server_url", None)
251                if server_url is None:
252                    raise ValueError(
253                        "Server URL is required to connect to a remote MCP server"
254                    )
255                ExternalToolServer.check_server_url(server_url)
256
257            case ToolServerType.local_mcp:
258                command = properties.get("command", None)
259                if command is None:
260                    raise ValueError("command is required to start a local MCP server")
261                if not isinstance(command, str):
262                    raise ValueError(
263                        "command must be a string to start a local MCP server"
264                    )
265                # Reject empty/whitespace-only command strings
266                if command.strip() == "":
267                    raise ValueError("command must be a non-empty string")
268
269                args = properties.get("args", None)
270                if args is not None:
271                    if not isinstance(args, list):
272                        raise ValueError(
273                            "arguments must be a list to start a local MCP server"
274                        )
275
276            case ToolServerType.kiln_task:
277                tool_name_validator(properties.get("name", ""))
278                err_msg_prefix = "Kiln task server properties:"
279                validate_return_dict_prop(
280                    properties, "description", str, err_msg_prefix
281                )
282                description = properties.get("description", "")
283                if len(description) > 128:
284                    raise ValueError("description must be 128 characters or less")
285                validate_return_dict_prop(
286                    properties, "is_archived", bool, err_msg_prefix
287                )
288                validate_return_dict_prop(properties, "task_id", str, err_msg_prefix)
289                validate_return_dict_prop(
290                    properties, "run_config_id", str, err_msg_prefix
291                )
292
293            case _:
294                # Type checking will catch missing cases
295                raise_exhaustive_enum_error(server_type)
296        return data
297
298    @model_validator(mode="before")
299    def validate_headers_and_env_vars(cls, data: dict) -> dict:
300        """
301        Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
302        """
303        type = ExternalToolServer.type_from_data(data)
304
305        properties = data.get("properties", {})
306        if properties is None:
307            raise ValueError("properties is required")
308
309        match type:
310            case ToolServerType.remote_mcp:
311                # Validate headers
312                headers = properties.get("headers", None)
313                if headers is not None:
314                    ExternalToolServer.check_headers(headers)
315
316                # Secret header keys are optional, validate if they are set
317                secret_header_keys = properties.get("secret_header_keys", None)
318                if secret_header_keys is not None:
319                    ExternalToolServer.check_secret_keys(
320                        secret_header_keys, "secret_header_keys", "remote_mcp"
321                    )
322
323            case ToolServerType.local_mcp:
324                # Validate secret environment variable keys
325                env_vars = properties.get("env_vars", {})
326                if env_vars is not None:
327                    ExternalToolServer.check_env_vars(env_vars)
328
329                # Secret env var keys are optional, but if they are set, they must be a list of strings
330                secret_env_var_keys = properties.get("secret_env_var_keys", None)
331                if secret_env_var_keys is not None:
332                    ExternalToolServer.check_secret_keys(
333                        secret_env_var_keys, "secret_env_var_keys", "local_mcp"
334                    )
335
336            case ToolServerType.kiln_task:
337                pass
338
339            case _:
340                raise_exhaustive_enum_error(type)
341
342        return data
343
344    def get_secret_keys(self) -> list[str]:
345        """
346        Get the list of secret key names based on server type.
347
348        Returns:
349            List of secret key names (header names for remote, env var names for local)
350        """
351        match self.type:
352            case ToolServerType.remote_mcp:
353                return self.properties.get("secret_header_keys", [])
354            case ToolServerType.local_mcp:
355                return self.properties.get("secret_env_var_keys", [])
356            case ToolServerType.kiln_task:
357                return []
358            case _:
359                raise_exhaustive_enum_error(self.type)
360
361    def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
362        """
363        Retrieve secrets from configuration system or in-memory storage.
364        Automatically determines which secret keys to retrieve based on the server type.
365        Config secrets take precedence over unsaved secrets.
366
367        Returns:
368            Tuple of (secrets_dict, missing_secrets_list) where:
369            - secrets_dict: Dictionary mapping key names to their secret values
370            - missing_secrets_list: List of secret key names that are missing values
371        """
372        secrets = {}
373        missing_secrets = []
374        secret_keys = self.get_secret_keys()
375
376        if secret_keys and len(secret_keys) > 0:
377            config = Config.shared()
378            mcp_secrets = config.get_value(MCP_SECRETS_KEY)
379
380            for key_name in secret_keys:
381                secret_value = None
382
383                # First check config secrets (persistent storage), key is mcp_server_id::key_name
384                secret_key = self._config_secret_key(key_name)
385                secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None
386
387                # Fall back to unsaved secrets (in-memory storage)
388                if (
389                    not secret_value
390                    and hasattr(self, "_unsaved_secrets")
391                    and key_name in self._unsaved_secrets
392                ):
393                    secret_value = self._unsaved_secrets[key_name]
394
395                if secret_value:
396                    secrets[key_name] = secret_value
397                else:
398                    missing_secrets.append(key_name)
399
400        return secrets, missing_secrets
401
402    def _save_secrets(self) -> None:
403        """
404        Save unsaved secrets to the configuration system.
405        """
406        secret_keys = self.get_secret_keys()
407
408        # No secrets to save
409        if not secret_keys:
410            return
411
412        if self.id is None:
413            raise ValueError("Server ID cannot be None when saving secrets")
414
415        # Check if secrets are already saved
416        if not hasattr(self, "_unsaved_secrets") or not self._unsaved_secrets:
417            return
418
419        config = Config.shared()
420        mcp_secrets: dict[str, str] = config.get_value(MCP_SECRETS_KEY) or {}
421
422        # Store secrets with the pattern: mcp_server_id::key_name
423        for key_name, secret_value in self._unsaved_secrets.items():
424            secret_key = self._config_secret_key(key_name)
425            mcp_secrets[secret_key] = secret_value
426
427        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
428
429        # Clear unsaved secrets after saving
430        self._unsaved_secrets.clear()
431
432    def delete_secrets(self) -> None:
433        """
434        Delete all secrets for this tool server from the configuration system.
435        """
436        secret_keys = self.get_secret_keys()
437
438        config = Config.shared()
439        mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]()
440
441        # Remove secrets with the pattern: mcp_server_id::key_name
442        for key_name in secret_keys:
443            secret_key = self._config_secret_key(key_name)
444            if secret_key in mcp_secrets:
445                del mcp_secrets[secret_key]
446
447        # Always call update_settings to maintain consistency with the old behavior
448        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
449
450    def save_to_file(self) -> None:
451        """
452        Override save_to_file to automatically save any unsaved secrets before saving to file.
453
454        This ensures that secrets are always saved when the object is saved,
455        preventing the issue where secrets could be lost if save_to_file is called
456        without explicitly saving secrets first.
457        """
458        # Save any unsaved secrets first
459        if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets:
460            self._save_secrets()
461
462        # Call the parent save_to_file method
463        super().save_to_file()
464
465    #  Internal helpers
466
467    def _config_secret_key(self, key_name: str) -> str:
468        """
469        Generate the secret key pattern for storing/retrieving secrets.
470
471        Args:
472            key_name: The name of the secret key
473
474        Returns:
475            The formatted secret key: "{server_id}::{key_name}"
476        """
477        return f"{self.id}::{key_name}"

Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.

This model stores the necessary configuration to connect to and authenticate with external MCP servers that provide tools for LLM interactions.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
type: kiln_ai.datamodel.external_tool_server.ToolServerType
description: str | None
properties: kiln_ai.datamodel.external_tool_server.LocalServerProperties | kiln_ai.datamodel.external_tool_server.RemoteServerProperties | kiln_ai.datamodel.external_tool_server.KilnTaskServerProperties
def model_post_init(self, _ExternalToolServer__context: Any) -> None:
90    def model_post_init(self, __context: Any) -> None:
91        # Process secrets after initialization (pydantic v2 hook)
92        self._process_secrets_from_properties()

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

@classmethod
def check_server_url(cls, server_url: str) -> None:
144    @classmethod
145    def check_server_url(cls, server_url: str) -> None:
146        """Validate Server URL"""
147        if not isinstance(server_url, str):
148            raise ValueError("Server URL must be a string")
149
150        # Check for leading whitespace in URL
151        if server_url != server_url.lstrip():
152            raise ValueError("Server URL must not have leading whitespace")
153
154        parsed_url = urlparse(server_url)
155        if not parsed_url.netloc:
156            raise ValueError("Server URL is not a valid URL")
157        if parsed_url.scheme not in ["http", "https"]:
158            raise ValueError("Server URL must start with http:// or https://")

Validate Server URL

@classmethod
def check_headers(cls, headers: dict) -> None:
160    @classmethod
161    def check_headers(cls, headers: dict) -> None:
162        """Validate Headers"""
163        if not isinstance(headers, dict):
164            raise ValueError("headers must be a dictionary")
165
166        for key, value in headers.items():
167            if not key:
168                raise ValueError("Header name is required")
169            if not value:
170                raise ValueError("Header value is required")
171
172            # Reject invalid header names and CR/LF in names/values
173            token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$")
174            if not token_re.match(key):
175                raise ValueError(f'Invalid header name: "{key}"')
176            if re.search(r"\r|\n", key) or re.search(r"\r|\n", value):
177                raise ValueError(
178                    "Header names/values must not contain invalid characters"
179                )

Validate Headers

@classmethod
def check_secret_keys(cls, secret_keys: list, key_type: str, tool_type: str) -> None:
181    @classmethod
182    def check_secret_keys(
183        cls, secret_keys: list, key_type: str, tool_type: str
184    ) -> None:
185        """Validate Secret Keys (generic method for both header and env var keys)"""
186        if not isinstance(secret_keys, list):
187            raise ValueError(
188                f"{key_type} must be a list for external tools of type '{tool_type}'"
189            )
190        if not all(isinstance(k, str) for k in secret_keys):
191            raise ValueError(f"{key_type} must contain only strings")
192        if not all(key for key in secret_keys):
193            raise ValueError("Secret key is required")

Validate Secret Keys (generic method for both header and env var keys)

@classmethod
def check_env_vars(cls, env_vars: dict) -> None:
195    @classmethod
196    def check_env_vars(cls, env_vars: dict) -> None:
197        """Validate Environment Variables"""
198        if not isinstance(env_vars, dict):
199            raise ValueError("environment variables must be a dictionary")
200
201        # Validate env_vars keys are in the correct format for Environment Variables
202        # According to POSIX specification, environment variable names must:
203        # - Start with a letter (a-z, A-Z) or underscore (_)
204        # - Contain only ASCII letters, digits, and underscores
205        for key, _ in env_vars.items():
206            if not key or not (
207                key[0].isascii() and (key[0].isalpha() or key[0] == "_")
208            ):
209                raise ValueError(
210                    f"Invalid environment variable key: {key}. Must start with a letter or underscore."
211                )
212
213            if not all(c.isascii() and (c.isalnum() or c == "_") for c in key):
214                raise ValueError(
215                    f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores."
216                )

Validate Environment Variables

@classmethod
def type_from_data(cls, data: dict) -> kiln_ai.datamodel.external_tool_server.ToolServerType:
218    @classmethod
219    def type_from_data(cls, data: dict) -> ToolServerType:
220        """Get the tool server type from the data for the the validators"""
221        raw_type = data.get("type")
222        if raw_type is None:
223            raise ValueError("type is required")
224        try:
225            return ToolServerType(raw_type)
226        except ValueError:
227            valid_types = ", ".join(type.value for type in ToolServerType)
228            raise ValueError(f"type must be one of: {valid_types}")

Get the tool server type from the data for the the validators

@model_validator(mode='before')
def upgrade_old_properties(cls, data: dict) -> dict:
230    @model_validator(mode="before")
231    def upgrade_old_properties(cls, data: dict) -> dict:
232        """
233        Upgrade properties for backwards compatibility.
234        """
235        properties = data.get("properties")
236        if properties is not None and "is_archived" not in properties:
237            # Add is_archived field with default value back to data
238            properties["is_archived"] = False
239            data["properties"] = properties
240        return data

Upgrade properties for backwards compatibility.

@model_validator(mode='before')
def validate_required_fields(cls, data: dict) -> dict:
242    @model_validator(mode="before")
243    def validate_required_fields(cls, data: dict) -> dict:
244        """Validate that each tool type has the required configuration."""
245        server_type = ExternalToolServer.type_from_data(data)
246        properties = data.get("properties", {})
247
248        match server_type:
249            case ToolServerType.remote_mcp:
250                server_url = properties.get("server_url", None)
251                if server_url is None:
252                    raise ValueError(
253                        "Server URL is required to connect to a remote MCP server"
254                    )
255                ExternalToolServer.check_server_url(server_url)
256
257            case ToolServerType.local_mcp:
258                command = properties.get("command", None)
259                if command is None:
260                    raise ValueError("command is required to start a local MCP server")
261                if not isinstance(command, str):
262                    raise ValueError(
263                        "command must be a string to start a local MCP server"
264                    )
265                # Reject empty/whitespace-only command strings
266                if command.strip() == "":
267                    raise ValueError("command must be a non-empty string")
268
269                args = properties.get("args", None)
270                if args is not None:
271                    if not isinstance(args, list):
272                        raise ValueError(
273                            "arguments must be a list to start a local MCP server"
274                        )
275
276            case ToolServerType.kiln_task:
277                tool_name_validator(properties.get("name", ""))
278                err_msg_prefix = "Kiln task server properties:"
279                validate_return_dict_prop(
280                    properties, "description", str, err_msg_prefix
281                )
282                description = properties.get("description", "")
283                if len(description) > 128:
284                    raise ValueError("description must be 128 characters or less")
285                validate_return_dict_prop(
286                    properties, "is_archived", bool, err_msg_prefix
287                )
288                validate_return_dict_prop(properties, "task_id", str, err_msg_prefix)
289                validate_return_dict_prop(
290                    properties, "run_config_id", str, err_msg_prefix
291                )
292
293            case _:
294                # Type checking will catch missing cases
295                raise_exhaustive_enum_error(server_type)
296        return data

Validate that each tool type has the required configuration.

@model_validator(mode='before')
def validate_headers_and_env_vars(cls, data: dict) -> dict:
298    @model_validator(mode="before")
299    def validate_headers_and_env_vars(cls, data: dict) -> dict:
300        """
301        Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
302        """
303        type = ExternalToolServer.type_from_data(data)
304
305        properties = data.get("properties", {})
306        if properties is None:
307            raise ValueError("properties is required")
308
309        match type:
310            case ToolServerType.remote_mcp:
311                # Validate headers
312                headers = properties.get("headers", None)
313                if headers is not None:
314                    ExternalToolServer.check_headers(headers)
315
316                # Secret header keys are optional, validate if they are set
317                secret_header_keys = properties.get("secret_header_keys", None)
318                if secret_header_keys is not None:
319                    ExternalToolServer.check_secret_keys(
320                        secret_header_keys, "secret_header_keys", "remote_mcp"
321                    )
322
323            case ToolServerType.local_mcp:
324                # Validate secret environment variable keys
325                env_vars = properties.get("env_vars", {})
326                if env_vars is not None:
327                    ExternalToolServer.check_env_vars(env_vars)
328
329                # Secret env var keys are optional, but if they are set, they must be a list of strings
330                secret_env_var_keys = properties.get("secret_env_var_keys", None)
331                if secret_env_var_keys is not None:
332                    ExternalToolServer.check_secret_keys(
333                        secret_env_var_keys, "secret_env_var_keys", "local_mcp"
334                    )
335
336            case ToolServerType.kiln_task:
337                pass
338
339            case _:
340                raise_exhaustive_enum_error(type)
341
342        return data

Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped

def get_secret_keys(self) -> list[str]:
344    def get_secret_keys(self) -> list[str]:
345        """
346        Get the list of secret key names based on server type.
347
348        Returns:
349            List of secret key names (header names for remote, env var names for local)
350        """
351        match self.type:
352            case ToolServerType.remote_mcp:
353                return self.properties.get("secret_header_keys", [])
354            case ToolServerType.local_mcp:
355                return self.properties.get("secret_env_var_keys", [])
356            case ToolServerType.kiln_task:
357                return []
358            case _:
359                raise_exhaustive_enum_error(self.type)

Get the list of secret key names based on server type.

Returns: List of secret key names (header names for remote, env var names for local)

def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
361    def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
362        """
363        Retrieve secrets from configuration system or in-memory storage.
364        Automatically determines which secret keys to retrieve based on the server type.
365        Config secrets take precedence over unsaved secrets.
366
367        Returns:
368            Tuple of (secrets_dict, missing_secrets_list) where:
369            - secrets_dict: Dictionary mapping key names to their secret values
370            - missing_secrets_list: List of secret key names that are missing values
371        """
372        secrets = {}
373        missing_secrets = []
374        secret_keys = self.get_secret_keys()
375
376        if secret_keys and len(secret_keys) > 0:
377            config = Config.shared()
378            mcp_secrets = config.get_value(MCP_SECRETS_KEY)
379
380            for key_name in secret_keys:
381                secret_value = None
382
383                # First check config secrets (persistent storage), key is mcp_server_id::key_name
384                secret_key = self._config_secret_key(key_name)
385                secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None
386
387                # Fall back to unsaved secrets (in-memory storage)
388                if (
389                    not secret_value
390                    and hasattr(self, "_unsaved_secrets")
391                    and key_name in self._unsaved_secrets
392                ):
393                    secret_value = self._unsaved_secrets[key_name]
394
395                if secret_value:
396                    secrets[key_name] = secret_value
397                else:
398                    missing_secrets.append(key_name)
399
400        return secrets, missing_secrets

Retrieve secrets from configuration system or in-memory storage. Automatically determines which secret keys to retrieve based on the server type. Config secrets take precedence over unsaved secrets.

Returns: Tuple of (secrets_dict, missing_secrets_list) where: - secrets_dict: Dictionary mapping key names to their secret values - missing_secrets_list: List of secret key names that are missing values

def delete_secrets(self) -> None:
432    def delete_secrets(self) -> None:
433        """
434        Delete all secrets for this tool server from the configuration system.
435        """
436        secret_keys = self.get_secret_keys()
437
438        config = Config.shared()
439        mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]()
440
441        # Remove secrets with the pattern: mcp_server_id::key_name
442        for key_name in secret_keys:
443            secret_key = self._config_secret_key(key_name)
444            if secret_key in mcp_secrets:
445                del mcp_secrets[secret_key]
446
447        # Always call update_settings to maintain consistency with the old behavior
448        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})

Delete all secrets for this tool server from the configuration system.

def save_to_file(self) -> None:
450    def save_to_file(self) -> None:
451        """
452        Override save_to_file to automatically save any unsaved secrets before saving to file.
453
454        This ensures that secrets are always saved when the object is saved,
455        preventing the issue where secrets could be lost if save_to_file is called
456        without explicitly saving secrets first.
457        """
458        # Save any unsaved secrets first
459        if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets:
460            self._save_secrets()
461
462        # Call the parent save_to_file method
463        super().save_to_file()

Override save_to_file to automatically save any unsaved secrets before saving to file.

This ensures that secrets are always saved when the object is saved, preventing the issue where secrets could be lost if save_to_file is called without explicitly saving secrets first.

def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class Feedback(kiln_ai.datamodel.basemodel.KilnParentedModel):
 8class Feedback(KilnParentedModel):
 9    """Feedback on a task run.
10
11    Supports multi-source feedback: different users, automated systems, and
12    different locations in the UI can each contribute independent feedback
13    entries on the same task run.
14    """
15
16    feedback: str = Field(
17        min_length=1,
18        description="Free-form text feedback on the task run.",
19    )
20    source: FeedbackSource = Field(
21        description="Where this feedback originated, e.g. 'run-page' or 'spec-feedback'.",
22    )

Feedback on a task run.

Supports multi-source feedback: different users, automated systems, and different locations in the UI can each contribute independent feedback entries on the same task run.

feedback: str
source: FeedbackSource
def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class FeedbackSource(builtins.str, enum.Enum):
85class FeedbackSource(str, Enum):
86    """Where a piece of feedback originated.
87
88    This is an append-only enum: new sources can be added freely, but existing
89    values must never be removed or renamed so that older persisted data
90    continues to load.
91    """
92
93    run_page = "run-page"
94    spec_feedback = "spec-feedback"

Where a piece of feedback originated.

This is an append-only enum: new sources can be added freely, but existing values must never be removed or renamed so that older persisted data continues to load.

run_page = <FeedbackSource.run_page: 'run-page'>
spec_feedback = <FeedbackSource.spec_feedback: 'spec-feedback'>
class FineTuneStatusType(builtins.str, enum.Enum):
53class FineTuneStatusType(str, Enum):
54    """
55    The status type of a fine-tune job.
56    """
57
58    unknown = "unknown"
59    pending = "pending"
60    running = "running"
61    completed = "completed"
62    failed = "failed"

The status type of a fine-tune job.

unknown = <FineTuneStatusType.unknown: 'unknown'>
pending = <FineTuneStatusType.pending: 'pending'>
running = <FineTuneStatusType.running: 'running'>
completed = <FineTuneStatusType.completed: 'completed'>
failed = <FineTuneStatusType.failed: 'failed'>
class Finetune(kiln_ai.datamodel.basemodel.KilnParentedModel):
 24class Finetune(KilnParentedModel):
 25    """
 26    The Kiln fine-tune datamodel.
 27
 28    Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.
 29    """
 30
 31    name: FilenameString = Field(description="The name of the fine-tune.")
 32    description: str | None = Field(
 33        default=None,
 34        description="A description of the fine-tune for you and your team. Not used in training.",
 35    )
 36    structured_output_mode: StructuredOutputMode | None = Field(
 37        default=None,
 38        description="Legacy field -- replaced by run_config.structured_output_mode. The mode to use to train the model for structured output, if it was trained with structured output. We should call the tuned model with this mode if set.",
 39    )
 40    provider: str = Field(
 41        description="The provider to use for the fine-tune (e.g. 'openai')."
 42    )
 43    base_model_id: str = Field(
 44        description="The id of the base model to use for the fine-tune. This string relates to the provider's IDs for their own models, not Kiln IDs."
 45    )
 46    provider_id: str | None = Field(
 47        default=None,
 48        description="The ID of the fine-tune job on the provider's side. May not be the same as the fine_tune_model_id.",
 49    )
 50    fine_tune_model_id: str | None = Field(
 51        default=None,
 52        description="The ID of the fine-tuned model on the provider's side. May not be the same as the provider_id.",
 53    )
 54    dataset_split_id: str = Field(
 55        description="The ID of the dataset split to use for this fine-tune.",
 56    )
 57    train_split_name: str = Field(
 58        default="train",
 59        description="The name of the training split to use for this fine-tune.",
 60    )
 61    validation_split_name: str | None = Field(
 62        default=None,
 63        description="The name of the validation split to use for this fine-tune. Optional.",
 64    )
 65    parameters: dict[str, str | int | float | bool] = Field(
 66        default={},
 67        description="The parameters to use for this fine-tune. These are provider-specific.",
 68    )
 69    # These two fields are saved exactly used for training. Even if they map exactly to a custom prompt or generator, those can change, so we want to keep a record of the training prompt.
 70    system_message: str = Field(
 71        description="The system message to use for this fine-tune.",
 72    )
 73    thinking_instructions: str | None = Field(
 74        default=None,
 75        description="The thinking instructions to use for this fine-tune. Only used when data_strategy is final_and_intermediate.",
 76    )
 77    latest_status: FineTuneStatusType = Field(
 78        default=FineTuneStatusType.unknown,
 79        description="The latest known status of this fine-tune. Not updated in real time.",
 80    )
 81    properties: Dict[str, str | int | float] = Field(
 82        default={},
 83        description="Properties of the fine-tune. Different providers may use different properties.",
 84    )
 85    data_strategy: ChatStrategy = Field(
 86        default=ChatStrategy.single_turn,
 87        description="The strategy to use for training the model. 'final_only' will only train on the final response. 'final_and_intermediate' will train on the final response and intermediate outputs (chain of thought or reasoning).",
 88    )
 89    run_config: KilnAgentRunConfigProperties | None = Field(
 90        default=None,
 91        description="The run configuration for this fine-tune.",
 92    )
 93
 94    # Workaround to return typed parent without importing Task
 95    def parent_task(self) -> Union["Task", None]:
 96        if self.parent is None or self.parent.__class__.__name__ != "Task":
 97            return None
 98        return self.parent  # type: ignore
 99
100    def nested_id(self) -> str:
101        """
102        Build the nested ID for this finetune in the format: project_id::task_id::finetune_id
103        """
104        task = self.parent_task()
105        if task is None:
106            raise ValueError("Finetune must have a parent task")
107        project = task.parent_project()
108        if project is None:
109            raise ValueError("Finetune must have a parent project")
110        return f"{project.id}::{task.id}::{self.id}"
111
112    @model_validator(mode="after")
113    def validate_thinking_instructions(self) -> Self:
114        if (
115            self.thinking_instructions is not None
116            and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
117        ):
118            raise ValueError(
119                f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
120            )
121        if (
122            self.thinking_instructions is None
123            and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
124        ):
125            raise ValueError(
126                f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
127            )
128        return self

The Kiln fine-tune datamodel.

Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
structured_output_mode: StructuredOutputMode | None
provider: str
base_model_id: str
provider_id: str | None
fine_tune_model_id: str | None
dataset_split_id: str
train_split_name: str
validation_split_name: str | None
parameters: dict[str, str | int | float | bool]
system_message: str
thinking_instructions: str | None
latest_status: FineTuneStatusType
properties: Dict[str, str | int | float]
data_strategy: kiln_ai.datamodel.datamodel_enums.ChatStrategy
run_config: kiln_ai.datamodel.run_config.KilnAgentRunConfigProperties | None
def parent_task(self) -> Optional[Task]:
95    def parent_task(self) -> Union["Task", None]:
96        if self.parent is None or self.parent.__class__.__name__ != "Task":
97            return None
98        return self.parent  # type: ignore
def nested_id(self) -> str:
100    def nested_id(self) -> str:
101        """
102        Build the nested ID for this finetune in the format: project_id::task_id::finetune_id
103        """
104        task = self.parent_task()
105        if task is None:
106            raise ValueError("Finetune must have a parent task")
107        project = task.parent_project()
108        if project is None:
109            raise ValueError("Finetune must have a parent project")
110        return f"{project.id}::{task.id}::{self.id}"

Build the nested ID for this finetune in the format: project_id::task_id::finetune_id

@model_validator(mode='after')
def validate_thinking_instructions(self) -> Self:
112    @model_validator(mode="after")
113    def validate_thinking_instructions(self) -> Self:
114        if (
115            self.thinking_instructions is not None
116            and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
117        ):
118            raise ValueError(
119                f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
120            )
121        if (
122            self.thinking_instructions is None
123            and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
124        ):
125            raise ValueError(
126                f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
127            )
128        return self
def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Priority(enum.IntEnum):
 9class Priority(IntEnum):
10    """Priority levels, where P0 is highest priority."""
11
12    p0 = 0
13    p1 = 1
14    p2 = 2
15    p3 = 3

Priority levels, where P0 is highest priority.

p0 = <Priority.p0: 0>
p1 = <Priority.p1: 1>
p2 = <Priority.p2: 2>
p3 = <Priority.p3: 3>
class Project(kiln_ai.datamodel.basemodel.KilnParentModel):
16class Project(
17    KilnParentModel,
18    parent_of={
19        "tasks": Task,
20        "documents": Document,
21        "extractor_configs": ExtractorConfig,
22        "chunker_configs": ChunkerConfig,
23        "embedding_configs": EmbeddingConfig,
24        "rag_configs": RagConfig,
25        "vector_store_configs": VectorStoreConfig,
26        "external_tool_servers": ExternalToolServer,
27        "reranker_configs": RerankerConfig,
28        "skills": Skill,
29    },
30):
31    """
32    A collection of related tasks.
33
34    Projects organize tasks into logical groups and provide high-level descriptions
35    of the overall goals.
36    """
37
38    name: FilenameString = Field(description="The name of the project.")
39    description: str | None = Field(
40        default=None,
41        description="A description of the project for you and your team. Will not be used in prompts/training/validation.",
42    )
43
44    # Needed for typechecking. We should fix this in KilnParentModel
45    def tasks(self, readonly: bool = False) -> list[Task]:
46        return super().tasks(readonly=readonly)  # type: ignore
47
48    def documents(self, readonly: bool = False) -> list[Document]:
49        return super().documents(readonly=readonly)  # type: ignore
50
51    def extractor_configs(self, readonly: bool = False) -> list[ExtractorConfig]:
52        return super().extractor_configs(readonly=readonly)  # type: ignore
53
54    def chunker_configs(self, readonly: bool = False) -> list[ChunkerConfig]:
55        return super().chunker_configs(readonly=readonly)  # type: ignore
56
57    def embedding_configs(self, readonly: bool = False) -> list[EmbeddingConfig]:
58        return super().embedding_configs(readonly=readonly)  # type: ignore
59
60    def vector_store_configs(self, readonly: bool = False) -> list[VectorStoreConfig]:
61        return super().vector_store_configs(readonly=readonly)  # type: ignore
62
63    def rag_configs(self, readonly: bool = False) -> list[RagConfig]:
64        return super().rag_configs(readonly=readonly)  # type: ignore
65
66    def external_tool_servers(self, readonly: bool = False) -> list[ExternalToolServer]:
67        return super().external_tool_servers(readonly=readonly)  # type: ignore
68
69    def reranker_configs(self, readonly: bool = False) -> list[RerankerConfig]:
70        return super().reranker_configs(readonly=readonly)  # type: ignore
71
72    def skills(self, readonly: bool = False) -> list[Skill]:
73        return super().skills(readonly=readonly)  # type: ignore

A collection of related tasks.

Projects organize tasks into logical groups and provide high-level descriptions of the overall goals.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
def tasks(self, readonly=False) -> List[Task]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def documents(self, readonly=False) -> List[kiln_ai.datamodel.extraction.Document]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def extractor_configs( self, readonly=False) -> List[kiln_ai.datamodel.extraction.ExtractorConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def chunker_configs(self, readonly=False) -> List[kiln_ai.datamodel.chunk.ChunkerConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def embedding_configs( self, readonly=False) -> List[kiln_ai.datamodel.embedding.EmbeddingConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def vector_store_configs( self, readonly=False) -> List[kiln_ai.datamodel.vector_store.VectorStoreConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def rag_configs(self, readonly=False) -> List[kiln_ai.datamodel.rag.RagConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def external_tool_servers( self, readonly=False) -> List[ExternalToolServer]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def reranker_configs(self, readonly=False) -> List[kiln_ai.datamodel.reranker.RerankerConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def skills(self, readonly=False) -> List[Skill]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Prompt(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.BasePrompt):
33class Prompt(KilnParentedModel, BasePrompt):
34    """
35    A prompt for a task. This is the custom prompt parented by a task.
36    """
37
38    pass

A prompt for a task. This is the custom prompt parented by a task.

def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class PromptGenerators(builtins.str, enum.Enum):
 9class PromptGenerators(str, Enum):
10    """Built-in prompt generators that can construct a prompt from a task definition."""
11
12    SIMPLE = "simple_prompt_builder"
13    MULTI_SHOT = "multi_shot_prompt_builder"
14    FEW_SHOT = "few_shot_prompt_builder"
15    REPAIRS = "repairs_prompt_builder"
16    SIMPLE_CHAIN_OF_THOUGHT = "simple_chain_of_thought_prompt_builder"
17    FEW_SHOT_CHAIN_OF_THOUGHT = "few_shot_chain_of_thought_prompt_builder"
18    MULTI_SHOT_CHAIN_OF_THOUGHT = "multi_shot_chain_of_thought_prompt_builder"

Built-in prompt generators that can construct a prompt from a task definition.

SIMPLE = <PromptGenerators.SIMPLE: 'simple_prompt_builder'>
MULTI_SHOT = <PromptGenerators.MULTI_SHOT: 'multi_shot_prompt_builder'>
FEW_SHOT = <PromptGenerators.FEW_SHOT: 'few_shot_prompt_builder'>
REPAIRS = <PromptGenerators.REPAIRS: 'repairs_prompt_builder'>
SIMPLE_CHAIN_OF_THOUGHT = <PromptGenerators.SIMPLE_CHAIN_OF_THOUGHT: 'simple_chain_of_thought_prompt_builder'>
FEW_SHOT_CHAIN_OF_THOUGHT = <PromptGenerators.FEW_SHOT_CHAIN_OF_THOUGHT: 'few_shot_chain_of_thought_prompt_builder'>
MULTI_SHOT_CHAIN_OF_THOUGHT = <PromptGenerators.MULTI_SHOT_CHAIN_OF_THOUGHT: 'multi_shot_chain_of_thought_prompt_builder'>
PromptId = typing.Annotated[str, AfterValidator(func=<function <lambda>>)]
class PromptOptimizationJob(kiln_ai.datamodel.basemodel.KilnParentedModel):
12class PromptOptimizationJob(KilnParentedModel):
13    """
14    The Kiln prompt optimization job datamodel.
15    """
16
17    name: FilenameString = Field(description="The name of the prompt optimization job.")
18    description: str | None = Field(
19        default=None,
20        description="A description of the prompt optimization job for you and your team.",
21    )
22    job_id: str = Field(description="The ID of the job on the remote Kiln server.")
23    target_run_config_id: str = Field(
24        description="The ID of the run configuration used for this job."
25    )
26    latest_status: str = Field(
27        default="pending",
28        description="The latest known status of this prompt optimization job (pending, running, succeeded, failed, cancelled). Not updated in real time.",
29    )
30    optimized_prompt: str | None = Field(
31        default=None,
32        description="The optimized prompt result when the job succeeds.",
33    )
34    created_prompt_id: str | None = Field(
35        default=None,
36        description="The ID of the prompt created from this job's result, if any.",
37    )
38    created_run_config_id: str | None = Field(
39        default=None,
40        description="The ID of the run config created from this job's result, if any.",
41    )
42    eval_ids: list[str] = Field(
43        default_factory=list,
44        description="List of eval IDs used for this job.",
45    )
46
47    def parent_task(self) -> "Task | None":
48        """Get the parent task, with proper typing."""
49        if self.parent is None or self.parent.__class__.__name__ != "Task":
50            return None
51        return self.parent  # type: ignore

The Kiln prompt optimization job datamodel.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
job_id: str
target_run_config_id: str
latest_status: str
optimized_prompt: str | None
created_prompt_id: str | None
created_run_config_id: str | None
eval_ids: list[str]
def parent_task(self) -> Task | None:
47    def parent_task(self) -> "Task | None":
48        """Get the parent task, with proper typing."""
49        if self.parent is None or self.parent.__class__.__name__ != "Task":
50            return None
51        return self.parent  # type: ignore

Get the parent task, with proper typing.

def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class RequirementRating(pydantic.main.BaseModel):
20class RequirementRating(BaseModel):
21    """Rating for a specific requirement within a task output."""
22
23    value: float = Field(
24        description="The rating value. Interpretation depends on rating type"
25    )
26    type: TaskOutputRatingType = Field(description="The type of rating")

Rating for a specific requirement within a task output.

value: float
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class Skill(kiln_ai.datamodel.basemodel.KilnParentedModel):
 20class Skill(KilnParentedModel):
 21    """A Skill represents reusable agent instructions following the agentskills.io specification.
 22
 23    Skills are project-level resources that can be attached to run configs.
 24    The agent discovers available skills via the skill tool description, then
 25    loads a skill's body on demand by calling skill(name="skill_name").
 26
 27    The skill's body (markdown instructions) is stored in a SKILL.md sidecar file
 28    rather than in skill.kiln, following the agentskills.io spec.
 29    """
 30
 31    name: SkillNameString = Field(
 32        description="Skill name. Kebab-case: lowercase alphanumeric with hyphens.",
 33    )
 34    description: str = Field(
 35        description="Description of what the skill does and when to use it.",
 36        min_length=1,
 37        max_length=1024,
 38    )
 39    is_archived: bool = Field(
 40        default=False,
 41        description="Whether the skill is archived. Archived skills are hidden from the UI and not available for use.",
 42    )
 43
 44    def parent_project(self) -> Union["Project", None]:
 45        if self.parent is None or self.parent.__class__.__name__ != "Project":
 46            return None
 47        return self.parent  # type: ignore
 48
 49    def skill_md_path(self) -> Path:
 50        """Path to the SKILL.md sidecar file (sibling of skill.kiln)."""
 51        if self.path is None:
 52            raise ValueError("Skill must be saved before accessing SKILL.md path")
 53        return self.path.parent / SKILL_MD_FILENAME
 54
 55    def skill_md_raw(self) -> str:
 56        """Read the full SKILL.md file content (frontmatter + body)."""
 57        md_path = self.skill_md_path()
 58        if not md_path.exists():
 59            raise FileNotFoundError(f"SKILL.md not found at {md_path}")
 60        if md_path.is_dir():
 61            raise FileNotFoundError(f"SKILL.md path is a folder, not a file: {md_path}")
 62        return md_path.read_text(encoding="utf-8")
 63
 64    def body(self) -> str:
 65        """Read the markdown body from SKILL.md (content after YAML frontmatter)."""
 66        return _parse_skill_md_body(self.skill_md_raw())
 67
 68    # -- Resources (references & assets) --
 69
 70    def references_dir(self) -> Path:
 71        if self.path is None:
 72            raise ValueError(
 73                "Skill must be saved before accessing references directory"
 74            )
 75        return self.path.parent / "references"
 76
 77    def assets_dir(self) -> Path:
 78        if self.path is None:
 79            raise ValueError("Skill must be saved before accessing assets directory")
 80        return self.path.parent / "assets"
 81
 82    def read_reference(self, relative_path: str) -> str:
 83        """Read a reference file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing."""
 84        return self._read_resource(self.references_dir(), relative_path)
 85
 86    def read_asset(self, relative_path: str) -> str:
 87        """Read an asset file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing."""
 88        return self._read_resource(self.assets_dir(), relative_path)
 89
 90    def _read_resource(self, base_dir: Path, relative_path: str) -> str:
 91        """Read a resource file, validating it resolves within base_dir and is readable text."""
 92        if not relative_path or not relative_path.strip():
 93            raise ValueError("Path cannot be empty")
 94
 95        target = base_dir / relative_path
 96        try:
 97            resolved = target.resolve()
 98            resolved.relative_to(base_dir.resolve())
 99        except ValueError:
100            raise ValueError("Path traversal is not allowed") from None
101
102        if resolved.is_dir():
103            raise ValueError(f"Path is a folder, not a file: {relative_path}")
104
105        try:
106            return resolved.read_text(encoding="utf-8")
107        except FileNotFoundError:
108            raise FileNotFoundError(
109                f"Resource file not found: {relative_path}"
110            ) from None
111        except UnicodeDecodeError:
112            raise ValueError(
113                f"File is not a readable text file: {relative_path}"
114            ) from None
115
116    def save_skill_md(self, body: str) -> None:
117        """Write SKILL.md with YAML frontmatter (name, description) + markdown body.
118
119        Reads name and description from self to keep SKILL.md in sync with skill.kiln.
120        """
121        if not body or not body.strip():
122            raise ValueError("body must be non-empty")
123        frontmatter = yaml.dump(
124            {"name": self.name, "description": self.description},
125            default_flow_style=False,
126            allow_unicode=True,
127            sort_keys=False,
128        ).rstrip("\n")
129        content = f"---\n{frontmatter}\n---\n\n{body}"
130        self.skill_md_path().write_text(content, encoding="utf-8")
131        self.references_dir().mkdir(exist_ok=True)
132        self.assets_dir().mkdir(exist_ok=True)

A Skill represents reusable agent instructions following the agentskills.io specification.

Skills are project-level resources that can be attached to run configs. The agent discovers available skills via the skill tool description, then loads a skill's body on demand by calling skill(name="skill_name").

The skill's body (markdown instructions) is stored in a SKILL.md sidecar file rather than in skill.kiln, following the agentskills.io spec.

name: Annotated[str, BeforeValidator(func=<function skill_name_validator at 0x7f90233e9300>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=64, pattern=None)]
description: str
is_archived: bool
def parent_project(self) -> Optional[Project]:
44    def parent_project(self) -> Union["Project", None]:
45        if self.parent is None or self.parent.__class__.__name__ != "Project":
46            return None
47        return self.parent  # type: ignore
def skill_md_path(self) -> pathlib._local.Path:
49    def skill_md_path(self) -> Path:
50        """Path to the SKILL.md sidecar file (sibling of skill.kiln)."""
51        if self.path is None:
52            raise ValueError("Skill must be saved before accessing SKILL.md path")
53        return self.path.parent / SKILL_MD_FILENAME

Path to the SKILL.md sidecar file (sibling of skill.kiln).

def skill_md_raw(self) -> str:
55    def skill_md_raw(self) -> str:
56        """Read the full SKILL.md file content (frontmatter + body)."""
57        md_path = self.skill_md_path()
58        if not md_path.exists():
59            raise FileNotFoundError(f"SKILL.md not found at {md_path}")
60        if md_path.is_dir():
61            raise FileNotFoundError(f"SKILL.md path is a folder, not a file: {md_path}")
62        return md_path.read_text(encoding="utf-8")

Read the full SKILL.md file content (frontmatter + body).

def body(self) -> str:
64    def body(self) -> str:
65        """Read the markdown body from SKILL.md (content after YAML frontmatter)."""
66        return _parse_skill_md_body(self.skill_md_raw())

Read the markdown body from SKILL.md (content after YAML frontmatter).

def references_dir(self) -> pathlib._local.Path:
70    def references_dir(self) -> Path:
71        if self.path is None:
72            raise ValueError(
73                "Skill must be saved before accessing references directory"
74            )
75        return self.path.parent / "references"
def assets_dir(self) -> pathlib._local.Path:
77    def assets_dir(self) -> Path:
78        if self.path is None:
79            raise ValueError("Skill must be saved before accessing assets directory")
80        return self.path.parent / "assets"
def read_reference(self, relative_path: str) -> str:
82    def read_reference(self, relative_path: str) -> str:
83        """Read a reference file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing."""
84        return self._read_resource(self.references_dir(), relative_path)

Read a reference file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.

def read_asset(self, relative_path: str) -> str:
86    def read_asset(self, relative_path: str) -> str:
87        """Read an asset file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing."""
88        return self._read_resource(self.assets_dir(), relative_path)

Read an asset file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.

def save_skill_md(self, body: str) -> None:
116    def save_skill_md(self, body: str) -> None:
117        """Write SKILL.md with YAML frontmatter (name, description) + markdown body.
118
119        Reads name and description from self to keep SKILL.md in sync with skill.kiln.
120        """
121        if not body or not body.strip():
122            raise ValueError("body must be non-empty")
123        frontmatter = yaml.dump(
124            {"name": self.name, "description": self.description},
125            default_flow_style=False,
126            allow_unicode=True,
127            sort_keys=False,
128        ).rstrip("\n")
129        content = f"---\n{frontmatter}\n---\n\n{body}"
130        self.skill_md_path().write_text(content, encoding="utf-8")
131        self.references_dir().mkdir(exist_ok=True)
132        self.assets_dir().mkdir(exist_ok=True)

Write SKILL.md with YAML frontmatter (name, description) + markdown body.

Reads name and description from self to keep SKILL.md in sync with skill.kiln.

def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class StructuredOutputMode(builtins.str, enum.Enum):
28class StructuredOutputMode(str, Enum):
29    """
30    Enumeration of supported structured output modes.
31
32    - json_schema: request json using API capabilities for json_schema
33    - function_calling: request json using API capabilities for function calling
34    - json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
35    - json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
36    - json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
37    - json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
38    - default: let the adapter decide (legacy, do not use for new use cases)
39    - unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
40    """
41
42    default = "default"
43    json_schema = "json_schema"
44    function_calling_weak = "function_calling_weak"
45    function_calling = "function_calling"
46    json_mode = "json_mode"
47    json_instructions = "json_instructions"
48    json_instruction_and_object = "json_instruction_and_object"
49    json_custom_instructions = "json_custom_instructions"
50    unknown = "unknown"

Enumeration of supported structured output modes.

  • json_schema: request json using API capabilities for json_schema
  • function_calling: request json using API capabilities for function calling
  • json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
  • json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
  • json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
  • json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
  • default: let the adapter decide (legacy, do not use for new use cases)
  • unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
default = <StructuredOutputMode.default: 'default'>
json_schema = <StructuredOutputMode.json_schema: 'json_schema'>
function_calling_weak = <StructuredOutputMode.function_calling_weak: 'function_calling_weak'>
function_calling = <StructuredOutputMode.function_calling: 'function_calling'>
json_mode = <StructuredOutputMode.json_mode: 'json_mode'>
json_instructions = <StructuredOutputMode.json_instructions: 'json_instructions'>
json_instruction_and_object = <StructuredOutputMode.json_instruction_and_object: 'json_instruction_and_object'>
json_custom_instructions = <StructuredOutputMode.json_custom_instructions: 'json_custom_instructions'>
unknown = <StructuredOutputMode.unknown: 'unknown'>
class Task(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.basemodel.KilnParentModel):
126class Task(
127    KilnParentedModel,
128    KilnParentModel,
129    parent_of={
130        "runs": TaskRun,
131        "dataset_splits": DatasetSplit,
132        "finetunes": Finetune,
133        "prompt_optimization_jobs": PromptOptimizationJob,
134        "prompts": Prompt,
135        "evals": Eval,
136        "specs": Spec,
137        "run_configs": TaskRunConfig,
138    },
139):
140    """
141    Represents a specific task to be performed, with associated requirements and validation rules.
142
143    Contains the task definition, requirements, input/output schemas, and maintains
144    a collection of task runs.
145    """
146
147    name: FilenameString = Field(description="The name of the task.")
148    description: str | None = Field(
149        default=None,
150        description="A description of the task for you and your team. Will not be used in prompts/training/validation.",
151    )
152    instruction: str = Field(
153        min_length=1,
154        description="The instructions for the task. Will be used in prompts/training/validation.",
155    )
156    requirements: List[TaskRequirement] = Field(
157        default=[],
158        description="Deprecated: Use specs and prompts instead.",
159    )
160    output_json_schema: JsonObjectSchema | None = Field(
161        default=None,
162        description="JSON schema for structured task output. Must be an object schema.",
163    )
164    input_json_schema: JsonSchema | None = Field(
165        default=None,
166        description="JSON schema for structured task input. Can be an object or array schema.",
167    )
168    thinking_instruction: str | None = Field(
169        default=None,
170        description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting.",
171    )
172
173    default_run_config_id: ID_TYPE | None = Field(
174        default=None,
175        description="ID of the run config to use for this task by default. Must exist in saved run configs for this task.",
176    )
177
178    def output_schema(self) -> Dict | None:
179        if self.output_json_schema is None:
180            return None
181        return schema_from_json_str(self.output_json_schema)
182
183    def input_schema(self) -> Dict | None:
184        if self.input_json_schema is None:
185            return None
186        # Allow arrays, not just objects
187        return schema_from_json_str(self.input_json_schema, require_object=False)
188
189    # These wrappers help for typechecking. We should fix this in KilnParentModel
190    def runs(self, readonly: bool = False) -> list[TaskRun]:
191        return super().runs(readonly=readonly)  # type: ignore
192
193    def dataset_splits(self, readonly: bool = False) -> list[DatasetSplit]:
194        return super().dataset_splits(readonly=readonly)  # type: ignore
195
196    def finetunes(self, readonly: bool = False) -> list[Finetune]:
197        return super().finetunes(readonly=readonly)  # type: ignore
198
199    def prompts(self, readonly: bool = False) -> list[Prompt]:
200        return super().prompts(readonly=readonly)  # type: ignore
201
202    def evals(self, readonly: bool = False) -> list[Eval]:
203        return super().evals(readonly=readonly)  # type: ignore
204
205    def run_configs(self, readonly: bool = False) -> list[TaskRunConfig]:
206        return super().run_configs(readonly=readonly)  # type: ignore
207
208    def specs(self, readonly: bool = False) -> list[Spec]:
209        return super().specs(readonly=readonly)  # type: ignore
210
211    def prompt_optimization_jobs(
212        self, readonly: bool = False
213    ) -> list[PromptOptimizationJob]:
214        return super().prompt_optimization_jobs(readonly=readonly)  # type: ignore
215
216    # Workaround to return typed parent without importing Task
217    def parent_project(self) -> Union["Project", None]:
218        if self.parent is None or self.parent.__class__.__name__ != "Project":
219            return None
220        return self.parent  # type: ignore

Represents a specific task to be performed, with associated requirements and validation rules.

Contains the task definition, requirements, input/output schemas, and maintains a collection of task runs.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236f9b20>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=120, pattern=None)]
description: str | None
instruction: str
requirements: List[TaskRequirement]
output_json_schema: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7f9023873240>)]]
input_json_schema: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7f90238737e0>)]]
thinking_instruction: str | None
default_run_config_id: Optional[str]
def output_schema(self) -> Optional[Dict]:
178    def output_schema(self) -> Dict | None:
179        if self.output_json_schema is None:
180            return None
181        return schema_from_json_str(self.output_json_schema)
def input_schema(self) -> Optional[Dict]:
183    def input_schema(self) -> Dict | None:
184        if self.input_json_schema is None:
185            return None
186        # Allow arrays, not just objects
187        return schema_from_json_str(self.input_json_schema, require_object=False)
def runs(self, readonly=False) -> List[TaskRun]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def dataset_splits( self, readonly=False) -> List[DatasetSplit]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def finetunes(self, readonly=False) -> List[Finetune]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def prompts(self, readonly=False) -> List[Prompt]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def evals(self, readonly=False) -> List[kiln_ai.datamodel.eval.Eval]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def run_configs(self, readonly=False) -> List[kiln_ai.datamodel.task.TaskRunConfig]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def specs(self, readonly=False) -> List[kiln_ai.datamodel.spec.Spec]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def prompt_optimization_jobs( self, readonly=False) -> List[PromptOptimizationJob]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def parent_project(self) -> Optional[Project]:
217    def parent_project(self) -> Union["Project", None]:
218        if self.parent is None or self.parent.__class__.__name__ != "Project":
219            return None
220        return self.parent  # type: ignore
def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutput(kiln_ai.datamodel.basemodel.KilnBaseModel):
325class TaskOutput(KilnBaseModel):
326    """
327    An output for a specific task run.
328
329    Contains the actual output content, its source (human or synthetic),
330    and optional rating information.
331    """
332
333    output: str = Field(
334        description="The output of the task. JSON formatted for structured output, plaintext for unstructured output."
335    )
336    source: DataSource | None = Field(
337        description="The source of the output: human or synthetic.",
338        default=None,
339    )
340    rating: TaskOutputRating | None = Field(
341        default=None, description="The rating of the output"
342    )
343
344    def validate_output_format(self, task: "Task") -> Self:
345        # validate output
346        if task.output_json_schema is not None:
347            try:
348                output_parsed = json.loads(self.output)
349            except json.JSONDecodeError:
350                raise ValueError("Output is not a valid JSON object")
351
352            validate_schema_with_value_error(
353                output_parsed,
354                task.output_json_schema,
355                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
356            )
357        return self
358
359    @model_validator(mode="after")
360    def validate_output_source(self, info: ValidationInfo) -> Self:
361        # On strict mode and not loaded from file, we validate output_source is not None.
362        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
363        if not strict_mode():
364            return self
365        if self.loaded_from_file(info):
366            return self
367        if self.source is None:
368            raise ValueError("Output source is required when strict mode is enabled")
369        return self

An output for a specific task run.

Contains the actual output content, its source (human or synthetic), and optional rating information.

output: str
source: DataSource | None
rating: TaskOutputRating | None
def validate_output_format(self, task: Task) -> Self:
344    def validate_output_format(self, task: "Task") -> Self:
345        # validate output
346        if task.output_json_schema is not None:
347            try:
348                output_parsed = json.loads(self.output)
349            except json.JSONDecodeError:
350                raise ValueError("Output is not a valid JSON object")
351
352            validate_schema_with_value_error(
353                output_parsed,
354                task.output_json_schema,
355                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
356            )
357        return self
@model_validator(mode='after')
def validate_output_source(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
359    @model_validator(mode="after")
360    def validate_output_source(self, info: ValidationInfo) -> Self:
361        # On strict mode and not loaded from file, we validate output_source is not None.
362        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
363        if not strict_mode():
364            return self
365        if self.loaded_from_file(info):
366            return self
367        if self.source is None:
368            raise ValueError("Output source is required when strict mode is enabled")
369        return self
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutputRating(kiln_ai.datamodel.basemodel.KilnBaseModel):
 50class TaskOutputRating(KilnBaseModel):
 51    """
 52    A rating for a task output, including an overall rating and ratings for each requirement.
 53
 54    Supports:
 55    - five_star: 1-5 star ratings
 56    - pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
 57    - pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
 58    """
 59
 60    type: TaskOutputRatingType = Field(
 61        default=TaskOutputRatingType.five_star,
 62        description="The rating system used for this rating.",
 63    )
 64    value: float | None = Field(
 65        description="The rating value. Interpretation depends on rating type:\n- five_star: 1-5 stars\n- pass_fail: 1.0 (pass) or 0.0 (fail)\n- pass_fail_critical: 1.0 (pass), 0.0 (fail), or -1.0 (critical fail)",
 66        default=None,
 67    )
 68    requirement_ratings: Dict[ID_TYPE, RequirementRating] = Field(
 69        default={},
 70        description="The ratings of the requirements of the task. The ID can be either a task_requirement_id or a named rating for an eval_output_score name (in format 'named::<name>').",
 71    )
 72
 73    # Previously we stored rating values as a dict of floats, but now we store them as RequirementRating objects.
 74    @model_validator(mode="before")
 75    def upgrade_old_format(cls, data: dict) -> dict:
 76        if not isinstance(data, dict):
 77            return data
 78
 79        # Check if we have the old format (dict of floats)
 80        req_ratings = data.get("requirement_ratings", {})
 81        if req_ratings and all(
 82            isinstance(v, (int, float)) for v in req_ratings.values()
 83        ):
 84            # Convert each float to a RequirementRating object
 85            # all ratings are five star at the point we used this format
 86            data["requirement_ratings"] = {
 87                k: {"value": v, "type": TaskOutputRatingType.five_star}
 88                for k, v in req_ratings.items()
 89            }
 90
 91        return data
 92
 93    # Used to select high quality outputs for example selection (MultiShotPromptBuilder, etc)
 94    def is_high_quality(self) -> bool:
 95        if self.value is None:
 96            return False
 97
 98        if self.type == TaskOutputRatingType.five_star:
 99            return self.value >= 4
100        elif self.type == TaskOutputRatingType.pass_fail:
101            return self.value == 1.0
102        elif self.type == TaskOutputRatingType.pass_fail_critical:
103            return self.value == 1.0
104        return False
105
106    @model_validator(mode="after")
107    def validate_rating(self) -> Self:
108        if self.type not in TaskOutputRatingType:
109            raise ValueError(f"Invalid rating type: {self.type}")
110
111        # Overall rating is optional
112        if self.value is not None:
113            self._validate_rating(self.type, self.value, "overall rating")
114
115        for req_id, req_rating in self.requirement_ratings.items():
116            self._validate_rating(
117                req_rating.type,
118                req_rating.value,
119                f"requirement rating for req ID: {req_id}",
120            )
121
122        return self
123
124    def _validate_rating(
125        self, type: TaskOutputRatingType, rating: float | None, rating_name: str
126    ) -> None:
127        if type == TaskOutputRatingType.five_star:
128            self._validate_five_star(rating, rating_name)
129        elif type == TaskOutputRatingType.pass_fail:
130            self._validate_pass_fail(rating, rating_name)
131        elif type == TaskOutputRatingType.pass_fail_critical:
132            self._validate_pass_fail_critical(rating, rating_name)
133
134    def _validate_five_star(self, rating: float | None, rating_name: str) -> None:
135        if rating is None or not isinstance(rating, float) or not rating.is_integer():
136            raise ValueError(
137                f"{rating_name.capitalize()} of type five_star must be an integer value (1-5)"
138            )
139        if rating < 1 or rating > 5:
140            raise ValueError(
141                f"{rating_name.capitalize()} of type five_star must be between 1 and 5 stars"
142            )
143
144    def _validate_pass_fail(self, rating: float | None, rating_name: str) -> None:
145        if rating is None or not isinstance(rating, float) or not rating.is_integer():
146            raise ValueError(
147                f"{rating_name.capitalize()} of type pass_fail must be an integer value (0 or 1)"
148            )
149        if rating not in [0, 1]:
150            raise ValueError(
151                f"{rating_name.capitalize()} of type pass_fail must be 0 (fail) or 1 (pass)"
152            )
153
154    def _validate_pass_fail_critical(
155        self, rating: float | None, rating_name: str
156    ) -> None:
157        if rating is None or not isinstance(rating, float) or not rating.is_integer():
158            raise ValueError(
159                f"{rating_name.capitalize()} of type pass_fail_critical must be an integer value (-1, 0, or 1)"
160            )
161        if rating not in [-1, 0, 1]:
162            raise ValueError(
163                f"{rating_name.capitalize()} of type pass_fail_critical must be -1 (critical fail), 0 (fail), or 1 (pass)"
164            )

A rating for a task output, including an overall rating and ratings for each requirement.

Supports:

  • five_star: 1-5 star ratings
  • pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
  • pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
value: float | None
requirement_ratings: Dict[Optional[str], RequirementRating]
@model_validator(mode='before')
def upgrade_old_format(cls, data: dict) -> dict:
74    @model_validator(mode="before")
75    def upgrade_old_format(cls, data: dict) -> dict:
76        if not isinstance(data, dict):
77            return data
78
79        # Check if we have the old format (dict of floats)
80        req_ratings = data.get("requirement_ratings", {})
81        if req_ratings and all(
82            isinstance(v, (int, float)) for v in req_ratings.values()
83        ):
84            # Convert each float to a RequirementRating object
85            # all ratings are five star at the point we used this format
86            data["requirement_ratings"] = {
87                k: {"value": v, "type": TaskOutputRatingType.five_star}
88                for k, v in req_ratings.items()
89            }
90
91        return data
def is_high_quality(self) -> bool:
 94    def is_high_quality(self) -> bool:
 95        if self.value is None:
 96            return False
 97
 98        if self.type == TaskOutputRatingType.five_star:
 99            return self.value >= 4
100        elif self.type == TaskOutputRatingType.pass_fail:
101            return self.value == 1.0
102        elif self.type == TaskOutputRatingType.pass_fail_critical:
103            return self.value == 1.0
104        return False
@model_validator(mode='after')
def validate_rating(self) -> Self:
106    @model_validator(mode="after")
107    def validate_rating(self) -> Self:
108        if self.type not in TaskOutputRatingType:
109            raise ValueError(f"Invalid rating type: {self.type}")
110
111        # Overall rating is optional
112        if self.value is not None:
113            self._validate_rating(self.type, self.value, "overall rating")
114
115        for req_id, req_rating in self.requirement_ratings.items():
116            self._validate_rating(
117                req_rating.type,
118                req_rating.value,
119                f"requirement rating for req ID: {req_id}",
120            )
121
122        return self
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutputRatingType(builtins.str, enum.Enum):
19class TaskOutputRatingType(str, Enum):
20    """Defines the types of rating systems available for task outputs."""
21
22    five_star = "five_star"
23    pass_fail = "pass_fail"
24    pass_fail_critical = "pass_fail_critical"
25    custom = "custom"

Defines the types of rating systems available for task outputs.

five_star = <TaskOutputRatingType.five_star: 'five_star'>
pass_fail = <TaskOutputRatingType.pass_fail: 'pass_fail'>
pass_fail_critical = <TaskOutputRatingType.pass_fail_critical: 'pass_fail_critical'>
custom = <TaskOutputRatingType.custom: 'custom'>
class TaskRequirement(pydantic.main.BaseModel):
37class TaskRequirement(BaseModel):
38    """
39    Defines a specific requirement that should be met by task outputs.
40
41    Includes an identifier, name, description, instruction for meeting the requirement,
42    priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).
43    """
44
45    id: ID_TYPE = ID_FIELD
46    name: FilenameStringShort = Field(description="The name of the task requirement.")
47    description: str | None = Field(
48        default=None,
49        description="Optional elaboration on the requirement's purpose.",
50    )
51    instruction: str = Field(
52        min_length=1, description="Instructions for meeting the requirement."
53    )
54    priority: Priority = Field(
55        default=Priority.p2, description="The priority level of the requirement."
56    )
57    type: TaskOutputRatingType = Field(
58        default=TaskOutputRatingType.five_star,
59        description="The rating type used to evaluate this requirement.",
60    )

Defines a specific requirement that should be met by task outputs.

Includes an identifier, name, description, instruction for meeting the requirement, priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).

id: Optional[str]
name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f90236faac0>, json_schema_input_type=PydanticUndefined), StringConstraints(strip_whitespace=None, to_upper=None, to_lower=None, strict=None, min_length=1, max_length=32, pattern=None)]
description: str | None
instruction: str
priority: Priority
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class TaskRun(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.basemodel.KilnParentModel):
 89class TaskRun(
 90    KilnParentedModel,
 91    KilnParentModel,
 92    parent_of={
 93        "feedback": Feedback,
 94    },
 95):
 96    """
 97    Represents a single execution of a Task.
 98
 99    Contains the input used, its source, the output produced, and optional
100    repair information if the output needed correction.
101    """
102
103    input: str = Field(
104        description="The inputs to the task. JSON formatted for structured input, plaintext for unstructured input."
105    )
106    input_source: DataSource | None = Field(
107        default=None, description="The source of the input: human or synthetic."
108    )
109
110    output: TaskOutput = Field(description="The output of the task run.")
111    repair_instructions: str | None = Field(
112        default=None,
113        description="Instructions for fixing the output. Should define what is wrong, and how to fix it. Will be used by models for both generating a fixed output, and evaluating future models.",
114    )
115    repaired_output: TaskOutput | None = Field(
116        default=None,
117        description="An version of the output with issues fixed. This must be a 'fixed' version of the existing output, and not an entirely new output. If you wish to generate an ideal curatorial output for this task unrelated to this output, generate a new TaskOutput with type 'human' instead of using this field.",
118    )
119    intermediate_outputs: Dict[str, str] | None = Field(
120        default=None,
121        description="Intermediate outputs from the task run. Keys are the names of the intermediate output steps (cot=chain of thought, etc), values are the output data.",
122    )
123    tags: List[str] = Field(
124        default=[],
125        description="Tags for the task run. Tags are used to categorize task runs for filtering and reporting.",
126    )
127    usage: Usage | None = Field(
128        default=None,
129        description="Usage information for the task run. This includes the number of input tokens, output tokens, and total tokens used.",
130    )
131    trace: list[ChatCompletionMessageParam] | None = Field(
132        default=None,
133        description="The trace of the task run in OpenAI format. This is the list of messages that were sent to/from the model.",
134    )
135    parent_task_run_id: str | None = Field(
136        default=None,
137        description="The ID of the parent task run. This is the ID of the task run that contains this task run.",
138    )
139
140    @property
141    def is_toolcall_pending(self) -> bool:
142        """True if the trace ends with an assistant message awaiting client tool execution."""
143        return trace_has_pending_client_tool_calls(self.trace)
144
145    def thinking_training_data(self) -> str | None:
146        """
147        Get the thinking training data from the task run.
148        """
149        if self.intermediate_outputs is None:
150            return None
151        return self.intermediate_outputs.get(
152            "reasoning"
153        ) or self.intermediate_outputs.get("chain_of_thought")
154
155    def has_thinking_training_data(self) -> bool:
156        """
157        Does this run have thinking data that we can use to train a thinking model?
158        """
159        return self.thinking_training_data() is not None
160
161    def feedback(self, readonly: bool = False) -> list[Feedback]:
162        return super().feedback(readonly=readonly)  # type: ignore
163
164    # Workaround to return typed parent without importing Task
165    def parent_task(self) -> Union["Task", None]:
166        if self.parent is None or self.parent.__class__.__name__ != "Task":
167            return None
168        return self.parent  # type: ignore
169
170    @model_validator(mode="after")
171    def validate_input_format(self, info: ValidationInfo) -> Self:
172        # Don't validate if loading from file (not new). Too slow.
173        # We don't allow changing task schema, so this is redundant validation.
174        # Note: we still validate if editing a loaded model
175        if self.loading_from_file(info):
176            # Consider loading an existing model as validated.
177            self._last_validated_input = self.input
178            return self
179
180        # Don't validate if input has not changed. Too slow to run this every time.
181        if (
182            hasattr(self, "_last_validated_input")
183            and self.input == self._last_validated_input
184        ):
185            return self
186
187        task = self.parent_task()
188        if task is None:
189            # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving)
190            return self
191
192        # validate input
193        if task.input_json_schema is not None:
194            try:
195                input_parsed = json.loads(self.input)
196            except json.JSONDecodeError:
197                raise ValueError("Input is not a valid JSON object")
198
199            validate_schema_with_value_error(
200                input_parsed,
201                task.input_json_schema,
202                "Input does not match task input schema.",
203                require_object=False,
204            )
205
206        self._last_validated_input = self.input
207        return self
208
209    @model_validator(mode="after")
210    def validate_output_format(self, info: ValidationInfo) -> Self:
211        # Don't validate if loading from file (not new). Too slow.
212        # Note: we still validate if editing a loaded model's output.
213        if self.loading_from_file(info):
214            # Consider loading an existing model as validated.
215            self._last_validated_output = self.output.output if self.output else None
216            return self
217
218        # Skip output validation when the run is waiting for tool call results.
219        # The output field is empty/partial in this state.
220        if self.is_toolcall_pending:
221            self._last_validated_output = self.output.output if self.output else None
222            return self
223
224        # Don't validate unless output has changed since last validation.
225        # The validator is slow and costly, don't want it running when setting other fields.
226        if (
227            hasattr(self, "_last_validated_output")
228            and self.output is not None
229            and self.output.output == self._last_validated_output
230        ):
231            return self
232
233        task = self.parent_task()
234        if task is None:
235            return self
236
237        self.output.validate_output_format(task)
238        self._last_validated_output = self.output.output if self.output else None
239        return self
240
241    @model_validator(mode="after")
242    def validate_repaired_output(self) -> Self:
243        if self.repaired_output is not None:
244            if self.repaired_output.rating is not None:
245                raise ValueError(
246                    "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed."
247                )
248
249            task = self.parent_task()
250            if (
251                task is not None
252                and self.repaired_output.output is not None
253                and task.output_json_schema is not None
254            ):
255                try:
256                    output_parsed = json.loads(self.repaired_output.output)
257                except json.JSONDecodeError:
258                    raise ValueError("Repaired output is not a valid JSON object")
259
260                validate_schema_with_value_error(
261                    output_parsed,
262                    task.output_json_schema,
263                    "Repaired output does not match task output schema.",
264                )
265
266        if self.repair_instructions is None and self.repaired_output is not None:
267            raise ValueError(
268                "Repair instructions are required if providing a repaired output."
269            )
270        if self.repair_instructions is not None and self.repaired_output is None:
271            raise ValueError(
272                "A repaired output is required if providing repair instructions."
273            )
274
275        return self
276
277    @model_validator(mode="after")
278    def validate_input_source(self, info: ValidationInfo) -> Self:
279        # On strict mode and not loaded from file, we validate input_source is not None.
280        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
281        if not strict_mode():
282            return self
283        if self.loaded_from_file(info):
284            return self
285        if self.input_source is None:
286            raise ValueError("input_source is required when strict mode is enabled")
287        return self
288
289    @model_validator(mode="after")
290    def validate_tags(self) -> Self:
291        for tag in self.tags:
292            if not tag:
293                raise ValueError("Tags cannot be empty strings")
294            if " " in tag:
295                raise ValueError("Tags cannot contain spaces. Try underscores.")
296
297        return self

Represents a single execution of a Task.

Contains the input used, its source, the output produced, and optional repair information if the output needed correction.

input: str
input_source: DataSource | None
output: TaskOutput
repair_instructions: str | None
repaired_output: TaskOutput | None
intermediate_outputs: Optional[Dict[str, str]]
tags: List[str]
usage: Usage | None
trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None
parent_task_run_id: str | None
is_toolcall_pending: bool
140    @property
141    def is_toolcall_pending(self) -> bool:
142        """True if the trace ends with an assistant message awaiting client tool execution."""
143        return trace_has_pending_client_tool_calls(self.trace)

True if the trace ends with an assistant message awaiting client tool execution.

def thinking_training_data(self) -> str | None:
145    def thinking_training_data(self) -> str | None:
146        """
147        Get the thinking training data from the task run.
148        """
149        if self.intermediate_outputs is None:
150            return None
151        return self.intermediate_outputs.get(
152            "reasoning"
153        ) or self.intermediate_outputs.get("chain_of_thought")

Get the thinking training data from the task run.

def has_thinking_training_data(self) -> bool:
155    def has_thinking_training_data(self) -> bool:
156        """
157        Does this run have thinking data that we can use to train a thinking model?
158        """
159        return self.thinking_training_data() is not None

Does this run have thinking data that we can use to train a thinking model?

def feedback(self, readonly=False) -> List[Feedback]:
743        def child_method(self, readonly: bool = False) -> list[child_class]:  # type: ignore[invalid-type-form]
744            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def parent_task(self) -> Optional[Task]:
165    def parent_task(self) -> Union["Task", None]:
166        if self.parent is None or self.parent.__class__.__name__ != "Task":
167            return None
168        return self.parent  # type: ignore
@model_validator(mode='after')
def validate_input_format(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
170    @model_validator(mode="after")
171    def validate_input_format(self, info: ValidationInfo) -> Self:
172        # Don't validate if loading from file (not new). Too slow.
173        # We don't allow changing task schema, so this is redundant validation.
174        # Note: we still validate if editing a loaded model
175        if self.loading_from_file(info):
176            # Consider loading an existing model as validated.
177            self._last_validated_input = self.input
178            return self
179
180        # Don't validate if input has not changed. Too slow to run this every time.
181        if (
182            hasattr(self, "_last_validated_input")
183            and self.input == self._last_validated_input
184        ):
185            return self
186
187        task = self.parent_task()
188        if task is None:
189            # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving)
190            return self
191
192        # validate input
193        if task.input_json_schema is not None:
194            try:
195                input_parsed = json.loads(self.input)
196            except json.JSONDecodeError:
197                raise ValueError("Input is not a valid JSON object")
198
199            validate_schema_with_value_error(
200                input_parsed,
201                task.input_json_schema,
202                "Input does not match task input schema.",
203                require_object=False,
204            )
205
206        self._last_validated_input = self.input
207        return self
@model_validator(mode='after')
def validate_output_format(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
209    @model_validator(mode="after")
210    def validate_output_format(self, info: ValidationInfo) -> Self:
211        # Don't validate if loading from file (not new). Too slow.
212        # Note: we still validate if editing a loaded model's output.
213        if self.loading_from_file(info):
214            # Consider loading an existing model as validated.
215            self._last_validated_output = self.output.output if self.output else None
216            return self
217
218        # Skip output validation when the run is waiting for tool call results.
219        # The output field is empty/partial in this state.
220        if self.is_toolcall_pending:
221            self._last_validated_output = self.output.output if self.output else None
222            return self
223
224        # Don't validate unless output has changed since last validation.
225        # The validator is slow and costly, don't want it running when setting other fields.
226        if (
227            hasattr(self, "_last_validated_output")
228            and self.output is not None
229            and self.output.output == self._last_validated_output
230        ):
231            return self
232
233        task = self.parent_task()
234        if task is None:
235            return self
236
237        self.output.validate_output_format(task)
238        self._last_validated_output = self.output.output if self.output else None
239        return self
@model_validator(mode='after')
def validate_repaired_output(self) -> Self:
241    @model_validator(mode="after")
242    def validate_repaired_output(self) -> Self:
243        if self.repaired_output is not None:
244            if self.repaired_output.rating is not None:
245                raise ValueError(
246                    "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed."
247                )
248
249            task = self.parent_task()
250            if (
251                task is not None
252                and self.repaired_output.output is not None
253                and task.output_json_schema is not None
254            ):
255                try:
256                    output_parsed = json.loads(self.repaired_output.output)
257                except json.JSONDecodeError:
258                    raise ValueError("Repaired output is not a valid JSON object")
259
260                validate_schema_with_value_error(
261                    output_parsed,
262                    task.output_json_schema,
263                    "Repaired output does not match task output schema.",
264                )
265
266        if self.repair_instructions is None and self.repaired_output is not None:
267            raise ValueError(
268                "Repair instructions are required if providing a repaired output."
269            )
270        if self.repair_instructions is not None and self.repaired_output is None:
271            raise ValueError(
272                "A repaired output is required if providing repair instructions."
273            )
274
275        return self
@model_validator(mode='after')
def validate_input_source(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
277    @model_validator(mode="after")
278    def validate_input_source(self, info: ValidationInfo) -> Self:
279        # On strict mode and not loaded from file, we validate input_source is not None.
280        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
281        if not strict_mode():
282            return self
283        if self.loaded_from_file(info):
284            return self
285        if self.input_source is None:
286            raise ValueError("input_source is required when strict mode is enabled")
287        return self
@model_validator(mode='after')
def validate_tags(self) -> Self:
289    @model_validator(mode="after")
290    def validate_tags(self) -> Self:
291        for tag in self.tags:
292            if not tag:
293                raise ValueError("Tags cannot be empty strings")
294            if " " in tag:
295                raise ValueError("Tags cannot contain spaces. Try underscores.")
296
297        return self
def relationship_name() -> str:
761        def relationship_name_method() -> str:
762            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
754        def parent_class_method() -> Type[KilnParentModel]:
755            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Usage(pydantic.main.BaseModel):
22class Usage(BaseModel):
23    """Token usage and cost information for a task run."""
24
25    input_tokens: int | None = Field(
26        default=None,
27        description="The number of input tokens used in the task run.",
28        ge=0,
29    )
30    output_tokens: int | None = Field(
31        default=None,
32        description="The number of output tokens used in the task run.",
33        ge=0,
34    )
35    total_tokens: int | None = Field(
36        default=None,
37        description="The total number of tokens used in the task run.",
38        ge=0,
39    )
40    cost: float | None = Field(
41        default=None,
42        description="The cost of the task run in US dollars, saved at runtime (prices can change over time).",
43        ge=0,
44    )
45    cached_tokens: int | None = Field(
46        default=None,
47        description="Number of tokens served from prompt cache. None if not reported.",
48        ge=0,
49    )
50
51    def __add__(self, other: "Usage") -> "Usage":
52        """Add two Usage objects together, handling None values gracefully.
53
54        None + None = None
55        None + value = value
56        value + None = value
57        value1 + value2 = value1 + value2
58        """
59        if not isinstance(other, Usage):
60            raise TypeError(f"Cannot add Usage with {type(other).__name__}")
61
62        def _add_optional_int(a: int | None, b: int | None) -> int | None:
63            if a is None and b is None:
64                return None
65            if a is None:
66                return b
67            if b is None:
68                return a
69            return a + b
70
71        def _add_optional_float(a: float | None, b: float | None) -> float | None:
72            if a is None and b is None:
73                return None
74            if a is None:
75                return b
76            if b is None:
77                return a
78            return a + b
79
80        return Usage(
81            input_tokens=_add_optional_int(self.input_tokens, other.input_tokens),
82            output_tokens=_add_optional_int(self.output_tokens, other.output_tokens),
83            total_tokens=_add_optional_int(self.total_tokens, other.total_tokens),
84            cost=_add_optional_float(self.cost, other.cost),
85            cached_tokens=_add_optional_int(self.cached_tokens, other.cached_tokens),
86        )

Token usage and cost information for a task run.

input_tokens: int | None
output_tokens: int | None
total_tokens: int | None
cost: float | None
cached_tokens: int | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def generate_model_id() -> str:
38def generate_model_id() -> str:
39    return str(uuid.uuid4().int)[:12]
prompt_generator_values = ['simple_prompt_builder', 'multi_shot_prompt_builder', 'few_shot_prompt_builder', 'repairs_prompt_builder', 'simple_chain_of_thought_prompt_builder', 'few_shot_chain_of_thought_prompt_builder', 'multi_shot_chain_of_thought_prompt_builder']