kiln_ai.datamodel

See our docs for details about our datamodel classes and hierarchy:

Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html

User docs: https://docs.kiln.tech/developers/kiln-datamodel

 1"""
 2See our docs for details about our datamodel classes and hierarchy:
 3
 4Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html
 5
 6User docs: https://docs.kiln.tech/developers/kiln-datamodel
 7"""
 8
 9# This component uses "flat" imports so we don't have too much internal structure exposed in the API.
10# for example you can just `from datamodel import Task, Project` instead of `from datamodel.task import Task; from datamodel.project import Project`
11
12from __future__ import annotations
13
14from kiln_ai.datamodel import (
15    chunk,
16    dataset_split,
17    embedding,
18    eval,
19    extraction,
20    rag,
21    strict_mode,
22)
23from kiln_ai.datamodel.datamodel_enums import (
24    FineTuneStatusType,
25    Priority,
26    StructuredOutputMode,
27    TaskOutputRatingType,
28)
29from kiln_ai.datamodel.dataset_split import DatasetSplit, DatasetSplitDefinition
30from kiln_ai.datamodel.external_tool_server import ExternalToolServer
31from kiln_ai.datamodel.finetune import Finetune
32from kiln_ai.datamodel.project import Project
33from kiln_ai.datamodel.prompt import BasePrompt, Prompt
34from kiln_ai.datamodel.prompt_id import (
35    PromptGenerators,
36    PromptId,
37    prompt_generator_values,
38)
39from kiln_ai.datamodel.task import Task, TaskRequirement
40from kiln_ai.datamodel.task_output import (
41    DataSource,
42    DataSourceProperty,
43    DataSourceType,
44    RequirementRating,
45    TaskOutput,
46    TaskOutputRating,
47)
48from kiln_ai.datamodel.task_run import TaskRun, Usage
49
50__all__ = [
51    "BasePrompt",
52    "DataSource",
53    "DataSourceProperty",
54    "DataSourceType",
55    "DatasetSplit",
56    "DatasetSplitDefinition",
57    "ExternalToolServer",
58    "FineTuneStatusType",
59    "Finetune",
60    "Priority",
61    "Project",
62    "Prompt",
63    "PromptGenerators",
64    "PromptId",
65    "RequirementRating",
66    "StructuredOutputMode",
67    "Task",
68    "TaskOutput",
69    "TaskOutputRating",
70    "TaskOutputRatingType",
71    "TaskRequirement",
72    "TaskRun",
73    "Usage",
74    "chunk",
75    "dataset_split",
76    "embedding",
77    "eval",
78    "extraction",
79    "prompt_generator_values",
80    "rag",
81    "strict_mode",
82]
class BasePrompt(pydantic.main.BaseModel):
 7class BasePrompt(BaseModel):
 8    """
 9    A prompt for a task. This is the basic data storage format which can be used throughout a project.
10
11    The "Prompt" model name is reserved for the custom prompts parented by a task.
12    """
13
14    name: FilenameString = Field(description="The name of the prompt.")
15    description: str | None = Field(
16        default=None,
17        description="A more detailed description of the prompt.",
18    )
19    generator_id: str | None = Field(
20        default=None,
21        description="The id of the generator that created this prompt.",
22    )
23    prompt: str = Field(
24        description="The prompt for the task.",
25        min_length=1,
26    )
27    chain_of_thought_instructions: str | None = Field(
28        default=None,
29        description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting. COT will not be used unless this is provided.",
30    )

A prompt for a task. This is the basic data storage format which can be used throughout a project.

The "Prompt" model name is reserved for the custom prompts parented by a task.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
description: str | None
generator_id: str | None
prompt: str
chain_of_thought_instructions: str | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataSource(pydantic.main.BaseModel):
192class DataSource(BaseModel):
193    """
194    Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.
195
196    Properties vary based on the source type - for synthetic/tool_call sources this includes
197    model information, for human sources this includes creator information, for file imports
198    this includes file information.
199    """
200
201    type: DataSourceType
202    properties: Dict[str, str | int | float] = Field(
203        default={},
204        description="Properties describing the data source. For synthetic things like model. For human: the human's name. For file_import: file information.",
205    )
206    run_config: Optional[RunConfigProperties] = Field(
207        default=None,
208        description="The run config used to generate the data, if generated by a running a model in Kiln (only true for type=synthetic).",
209    )
210
211    _data_source_properties = [
212        DataSourceProperty(
213            name="created_by",
214            type=str,
215            required_for=[DataSourceType.human],
216            not_allowed_for=[
217                DataSourceType.synthetic,
218                DataSourceType.file_import,
219                DataSourceType.tool_call,
220            ],
221        ),
222        DataSourceProperty(
223            name="model_name",
224            type=str,
225            required_for=[DataSourceType.synthetic],
226            not_allowed_for=[
227                DataSourceType.human,
228                DataSourceType.file_import,
229                DataSourceType.tool_call,
230            ],
231        ),
232        DataSourceProperty(
233            name="model_provider",
234            type=str,
235            required_for=[DataSourceType.synthetic],
236            not_allowed_for=[
237                DataSourceType.human,
238                DataSourceType.file_import,
239                DataSourceType.tool_call,
240            ],
241        ),
242        DataSourceProperty(
243            name="adapter_name",
244            type=str,
245            required_for=[DataSourceType.synthetic],
246            not_allowed_for=[
247                DataSourceType.human,
248                DataSourceType.file_import,
249                DataSourceType.tool_call,
250            ],
251        ),
252        DataSourceProperty(
253            # Legacy field -- allow loading from old runs, but we shouldn't be setting it.
254            name="prompt_builder_name",
255            type=str,
256            not_allowed_for=[
257                DataSourceType.human,
258                DataSourceType.file_import,
259                DataSourceType.tool_call,
260            ],
261        ),
262        DataSourceProperty(
263            # The PromptId of the prompt. Can be a saved prompt, fine-tune, generator name, etc. See PromptId type for more details.
264            name="prompt_id",
265            type=str,
266            not_allowed_for=[
267                DataSourceType.human,
268                DataSourceType.file_import,
269                DataSourceType.tool_call,
270            ],
271        ),
272        DataSourceProperty(
273            name="file_name",
274            type=str,
275            required_for=[DataSourceType.file_import],
276            not_allowed_for=[
277                DataSourceType.human,
278                DataSourceType.synthetic,
279                DataSourceType.tool_call,
280            ],
281        ),
282    ]
283
284    @model_validator(mode="after")
285    def validate_type(self) -> "DataSource":
286        if self.type not in DataSourceType:
287            raise ValueError(f"Invalid data source type: {self.type}")
288        return self
289
290    @model_validator(mode="after")
291    def validate_properties(self) -> "DataSource":
292        for prop in self._data_source_properties:
293            # Check the property type is correct
294            if prop.name in self.properties:
295                if not isinstance(self.properties[prop.name], prop.type):
296                    raise ValueError(
297                        f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source"
298                    )
299            # Check the property is required for the data source type
300            if self.type in prop.required_for:
301                if prop.name not in self.properties:
302                    raise ValueError(
303                        f"'{prop.name}' is required for {self.type} data source"
304                    )
305            # Check the property is not allowed for the data source type
306            elif self.type in prop.not_allowed_for and prop.name in self.properties:
307                raise ValueError(
308                    f"'{prop.name}' is not allowed for {self.type} data source"
309                )
310        return self
311
312    @model_validator(mode="after")
313    def validate_no_empty_properties(self) -> Self:
314        for prop, value in self.properties.items():
315            if isinstance(value, str) and value == "":
316                raise ValueError(
317                    f"Property '{prop}' must be a non-empty string for {self.type} data source"
318                )
319        return self

Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.

Properties vary based on the source type - for synthetic/tool_call sources this includes model information, for human sources this includes creator information, for file imports this includes file information.

properties: Dict[str, str | int | float]
run_config: Optional[kiln_ai.datamodel.run_config.RunConfigProperties]
@model_validator(mode='after')
def validate_type(self) -> DataSource:
284    @model_validator(mode="after")
285    def validate_type(self) -> "DataSource":
286        if self.type not in DataSourceType:
287            raise ValueError(f"Invalid data source type: {self.type}")
288        return self
@model_validator(mode='after')
def validate_properties(self) -> DataSource:
290    @model_validator(mode="after")
291    def validate_properties(self) -> "DataSource":
292        for prop in self._data_source_properties:
293            # Check the property type is correct
294            if prop.name in self.properties:
295                if not isinstance(self.properties[prop.name], prop.type):
296                    raise ValueError(
297                        f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source"
298                    )
299            # Check the property is required for the data source type
300            if self.type in prop.required_for:
301                if prop.name not in self.properties:
302                    raise ValueError(
303                        f"'{prop.name}' is required for {self.type} data source"
304                    )
305            # Check the property is not allowed for the data source type
306            elif self.type in prop.not_allowed_for and prop.name in self.properties:
307                raise ValueError(
308                    f"'{prop.name}' is not allowed for {self.type} data source"
309                )
310        return self
@model_validator(mode='after')
def validate_no_empty_properties(self) -> Self:
312    @model_validator(mode="after")
313    def validate_no_empty_properties(self) -> Self:
314        for prop, value in self.properties.items():
315            if isinstance(value, str) and value == "":
316                raise ValueError(
317                    f"Property '{prop}' must be a non-empty string for {self.type} data source"
318                )
319        return self
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class DataSourceProperty(pydantic.main.BaseModel):
178class DataSourceProperty(BaseModel):
179    """
180    Defines a property that can be associated with a data source.
181
182    Includes validation rules for when properties are required or not allowed
183    based on the data source type.
184    """
185
186    name: str
187    type: Type[Union[str, int, float]]
188    required_for: List[DataSourceType] = []
189    not_allowed_for: List[DataSourceType] = []

Defines a property that can be associated with a data source.

Includes validation rules for when properties are required or not allowed based on the data source type.

name: str
type: Type[Union[str, int, float]]
required_for: List[DataSourceType]
not_allowed_for: List[DataSourceType]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataSourceType(builtins.str, enum.Enum):
164class DataSourceType(str, Enum):
165    """
166    The source type of a piece of data.
167
168    Human: a human created the data
169    Synthetic: a model created the data
170    """
171
172    human = "human"
173    synthetic = "synthetic"
174    file_import = "file_import"
175    tool_call = "tool_call"

The source type of a piece of data.

Human: a human created the data Synthetic: a model created the data

human = <DataSourceType.human: 'human'>
synthetic = <DataSourceType.synthetic: 'synthetic'>
file_import = <DataSourceType.file_import: 'file_import'>
tool_call = <DataSourceType.tool_call: 'tool_call'>
class DatasetSplit(kiln_ai.datamodel.basemodel.KilnParentedModel):
 67class DatasetSplit(KilnParentedModel):
 68    """
 69    A collection of task runs, with optional splits (train, test, validation).
 70
 71    Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.
 72
 73    Maintains a list of IDs for each split, to avoid data duplication.
 74    """
 75
 76    name: FilenameString = Field(description="The name of the dataset split.")
 77    description: str | None = Field(
 78        default=None,
 79        description="A description of the dataset for you and your team. Not used in training.",
 80    )
 81    splits: list[DatasetSplitDefinition] = Field(
 82        default_factory=list,
 83        description="The splits in the dataset.",
 84    )
 85    split_contents: dict[str, list[str]] = Field(
 86        description="The contents of each split in the dataset. The key is the split name, and the value is a list of task run IDs.",
 87    )
 88    filter: DatasetFilterId | None = Field(
 89        default=None,
 90        description="The filter used to build the dataset.",
 91    )
 92
 93    @model_validator(mode="after")
 94    def validate_split_percentages(self) -> "DatasetSplit":
 95        total = sum(split.percentage for split in self.splits)
 96        if not math.isclose(total, 1.0, rel_tol=1e-9):
 97            raise ValueError(f"The sum of split percentages must be 1.0 (got {total})")
 98        return self
 99
100    @classmethod
101    def from_task(
102        cls,
103        name: str,
104        task: "Task",
105        splits: list[DatasetSplitDefinition],
106        filter_id: DatasetFilterId = "all",
107        description: str | None = None,
108    ):
109        """
110        Build a dataset split from a task.
111        """
112        filter = dataset_filter_from_id(filter_id)
113        split_contents = cls.build_split_contents(task, splits, filter)
114        return cls(
115            parent=task,
116            name=name,
117            description=description,
118            splits=splits,
119            split_contents=split_contents,
120            filter=filter_id,
121        )
122
123    @classmethod
124    def build_split_contents(
125        cls,
126        task: "Task",
127        splits: list[DatasetSplitDefinition],
128        filter: DatasetFilter,
129    ) -> dict[str, list[str]]:
130        valid_ids = []
131        for task_run in task.runs():
132            if filter(task_run):
133                valid_ids.append(task_run.id)
134
135        # Shuffle and split by split percentage
136        random.shuffle(valid_ids)
137        split_contents = {}
138        start_idx = 0
139        remaining_items = len(valid_ids)
140
141        # Handle all splits except the last one
142        for split in splits[:-1]:
143            split_size = round(len(valid_ids) * split.percentage)
144            split_contents[split.name] = valid_ids[start_idx : start_idx + split_size]
145            start_idx += split_size
146            remaining_items -= split_size
147
148        # Last split gets all remaining items (for rounding)
149        if splits:
150            split_contents[splits[-1].name] = valid_ids[start_idx:]
151
152        return split_contents
153
154    def parent_task(self) -> "Task | None":
155        # inline import to avoid circular import
156        from kiln_ai.datamodel import Task
157
158        if not isinstance(self.parent, Task):
159            return None
160        return self.parent
161
162    def missing_count(self) -> int:
163        """
164        Returns:
165            int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
166        """
167        parent = self.parent_task()
168        if parent is None:
169            raise ValueError("DatasetSplit has no parent task")
170
171        runs = parent.runs(readonly=True)
172        all_ids = set(run.id for run in runs)
173        all_ids_in_splits = set()
174        for ids in self.split_contents.values():
175            all_ids_in_splits.update(ids)
176        missing = all_ids_in_splits - all_ids
177        return len(missing)

A collection of task runs, with optional splits (train, test, validation).

Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.

Maintains a list of IDs for each split, to avoid data duplication.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
description: str | None
splits: list[DatasetSplitDefinition]
split_contents: dict[str, list[str]]
filter: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7fc0755bf560>)]]
@model_validator(mode='after')
def validate_split_percentages(self) -> DatasetSplit:
93    @model_validator(mode="after")
94    def validate_split_percentages(self) -> "DatasetSplit":
95        total = sum(split.percentage for split in self.splits)
96        if not math.isclose(total, 1.0, rel_tol=1e-9):
97            raise ValueError(f"The sum of split percentages must be 1.0 (got {total})")
98        return self
@classmethod
def from_task( cls, name: str, task: Task, splits: list[DatasetSplitDefinition], filter_id: Annotated[str, AfterValidator(func=<function <lambda>>)] = 'all', description: str | None = None):
100    @classmethod
101    def from_task(
102        cls,
103        name: str,
104        task: "Task",
105        splits: list[DatasetSplitDefinition],
106        filter_id: DatasetFilterId = "all",
107        description: str | None = None,
108    ):
109        """
110        Build a dataset split from a task.
111        """
112        filter = dataset_filter_from_id(filter_id)
113        split_contents = cls.build_split_contents(task, splits, filter)
114        return cls(
115            parent=task,
116            name=name,
117            description=description,
118            splits=splits,
119            split_contents=split_contents,
120            filter=filter_id,
121        )

Build a dataset split from a task.

@classmethod
def build_split_contents( cls, task: Task, splits: list[DatasetSplitDefinition], filter: kiln_ai.datamodel.dataset_filters.DatasetFilter) -> dict[str, list[str]]:
123    @classmethod
124    def build_split_contents(
125        cls,
126        task: "Task",
127        splits: list[DatasetSplitDefinition],
128        filter: DatasetFilter,
129    ) -> dict[str, list[str]]:
130        valid_ids = []
131        for task_run in task.runs():
132            if filter(task_run):
133                valid_ids.append(task_run.id)
134
135        # Shuffle and split by split percentage
136        random.shuffle(valid_ids)
137        split_contents = {}
138        start_idx = 0
139        remaining_items = len(valid_ids)
140
141        # Handle all splits except the last one
142        for split in splits[:-1]:
143            split_size = round(len(valid_ids) * split.percentage)
144            split_contents[split.name] = valid_ids[start_idx : start_idx + split_size]
145            start_idx += split_size
146            remaining_items -= split_size
147
148        # Last split gets all remaining items (for rounding)
149        if splits:
150            split_contents[splits[-1].name] = valid_ids[start_idx:]
151
152        return split_contents
def parent_task(self) -> Task | None:
154    def parent_task(self) -> "Task | None":
155        # inline import to avoid circular import
156        from kiln_ai.datamodel import Task
157
158        if not isinstance(self.parent, Task):
159            return None
160        return self.parent
def missing_count(self) -> int:
162    def missing_count(self) -> int:
163        """
164        Returns:
165            int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
166        """
167        parent = self.parent_task()
168        if parent is None:
169            raise ValueError("DatasetSplit has no parent task")
170
171        runs = parent.runs(readonly=True)
172        all_ids = set(run.id for run in runs)
173        all_ids_in_splits = set()
174        for ids in self.split_contents.values():
175            all_ids_in_splits.update(ids)
176        missing = all_ids_in_splits - all_ids
177        return len(missing)

Returns: int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset

def relationship_name() -> str:
661        def relationship_name_method() -> str:
662            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
654        def parent_class_method() -> Type[KilnParentModel]:
655            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class DatasetSplitDefinition(pydantic.main.BaseModel):
23class DatasetSplitDefinition(BaseModel):
24    """
25    A definition of a split in a dataset.
26
27    Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)
28    """
29
30    name: FilenameString = Field(
31        description="The name of the dataset split definition."
32    )
33    description: str | None = Field(
34        default=None,
35        description="A description of the dataset for you and your team. Not used in training.",
36    )
37    percentage: float = Field(
38        ge=0.0,
39        le=1.0,
40        description="The percentage of the dataset that this split represents (between 0 and 1).",
41    )

A definition of a split in a dataset.

Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
description: str | None
percentage: float
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ExternalToolServer(kiln_ai.datamodel.basemodel.KilnParentedModel):
 50class ExternalToolServer(KilnParentedModel):
 51    """
 52    Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.
 53
 54    This model stores the necessary configuration to connect to and authenticate with
 55    external MCP servers that provide tools for LLM interactions.
 56    """
 57
 58    name: FilenameString = Field(description="The name of the external tool.")
 59    type: ToolServerType = Field(
 60        description="The type of external tool server. Remote tools are hosted on a remote server",
 61    )
 62    description: str | None = Field(
 63        default=None,
 64        description="A description of the external tool for you and your team. Will not be used in prompts/training/validation.",
 65    )
 66
 67    properties: (
 68        LocalServerProperties | RemoteServerProperties | KilnTaskServerProperties
 69    ) = Field(
 70        description="Configuration properties specific to the tool type.",
 71    )
 72
 73    # Private variable to store unsaved secrets
 74    _unsaved_secrets: dict[str, str] = PrivateAttr(default_factory=dict)
 75
 76    def model_post_init(self, __context: Any) -> None:
 77        # Process secrets after initialization (pydantic v2 hook)
 78        self._process_secrets_from_properties()
 79
 80    def _process_secrets_from_properties(self) -> None:
 81        """
 82        Extract secrets from properties and move them to _unsaved_secrets.
 83        This removes secrets from the properties dict so they aren't saved to file.
 84        Clears existing _unsaved_secrets first to handle property updates correctly.
 85        """
 86        # Clear existing unsaved secrets since we're reprocessing
 87        self._unsaved_secrets.clear()
 88
 89        secret_keys = self.get_secret_keys()
 90
 91        if not secret_keys:
 92            return
 93
 94        # Extract secret values from properties based on server type
 95        match self.type:
 96            case ToolServerType.remote_mcp:
 97                headers = self.properties.get("headers", {})
 98                for key_name in secret_keys:
 99                    if key_name in headers:
100                        self._unsaved_secrets[key_name] = headers[key_name]
101                        # Remove from headers immediately so they are not saved to file
102                        del headers[key_name]
103
104            case ToolServerType.local_mcp:
105                env_vars = self.properties.get("env_vars", {})
106                for key_name in secret_keys:
107                    if key_name in env_vars:
108                        self._unsaved_secrets[key_name] = env_vars[key_name]
109                        # Remove from env_vars immediately so they are not saved to file
110                        del env_vars[key_name]
111
112            case ToolServerType.kiln_task:
113                pass
114
115            case _:
116                raise_exhaustive_enum_error(self.type)
117
118    def __setattr__(self, name: str, value: Any) -> None:
119        """
120        Override __setattr__ to process secrets whenever properties are updated.
121        """
122        super().__setattr__(name, value)
123
124        # Process secrets whenever properties are updated
125        if name == "properties":
126            self._process_secrets_from_properties()
127
128    # Validation Helpers
129
130    @classmethod
131    def check_server_url(cls, server_url: str) -> None:
132        """Validate Server URL"""
133        if not isinstance(server_url, str):
134            raise ValueError("Server URL must be a string")
135
136        # Check for leading whitespace in URL
137        if server_url != server_url.lstrip():
138            raise ValueError("Server URL must not have leading whitespace")
139
140        parsed_url = urlparse(server_url)
141        if not parsed_url.netloc:
142            raise ValueError("Server URL is not a valid URL")
143        if parsed_url.scheme not in ["http", "https"]:
144            raise ValueError("Server URL must start with http:// or https://")
145
146    @classmethod
147    def check_headers(cls, headers: dict) -> None:
148        """Validate Headers"""
149        if not isinstance(headers, dict):
150            raise ValueError("headers must be a dictionary")
151
152        for key, value in headers.items():
153            if not key:
154                raise ValueError("Header name is required")
155            if not value:
156                raise ValueError("Header value is required")
157
158            # Reject invalid header names and CR/LF in names/values
159            token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$")
160            if not token_re.match(key):
161                raise ValueError(f'Invalid header name: "{key}"')
162            if re.search(r"\r|\n", key) or re.search(r"\r|\n", value):
163                raise ValueError(
164                    "Header names/values must not contain invalid characters"
165                )
166
167    @classmethod
168    def check_secret_keys(
169        cls, secret_keys: list, key_type: str, tool_type: str
170    ) -> None:
171        """Validate Secret Keys (generic method for both header and env var keys)"""
172        if not isinstance(secret_keys, list):
173            raise ValueError(
174                f"{key_type} must be a list for external tools of type '{tool_type}'"
175            )
176        if not all(isinstance(k, str) for k in secret_keys):
177            raise ValueError(f"{key_type} must contain only strings")
178        if not all(key for key in secret_keys):
179            raise ValueError("Secret key is required")
180
181    @classmethod
182    def check_env_vars(cls, env_vars: dict) -> None:
183        """Validate Environment Variables"""
184        if not isinstance(env_vars, dict):
185            raise ValueError("environment variables must be a dictionary")
186
187        # Validate env_vars keys are in the correct format for Environment Variables
188        # According to POSIX specification, environment variable names must:
189        # - Start with a letter (a-z, A-Z) or underscore (_)
190        # - Contain only ASCII letters, digits, and underscores
191        for key, _ in env_vars.items():
192            if not key or not (
193                key[0].isascii() and (key[0].isalpha() or key[0] == "_")
194            ):
195                raise ValueError(
196                    f"Invalid environment variable key: {key}. Must start with a letter or underscore."
197                )
198
199            if not all(c.isascii() and (c.isalnum() or c == "_") for c in key):
200                raise ValueError(
201                    f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores."
202                )
203
204    @classmethod
205    def type_from_data(cls, data: dict) -> ToolServerType:
206        """Get the tool server type from the data for the the validators"""
207        raw_type = data.get("type")
208        if raw_type is None:
209            raise ValueError("type is required")
210        try:
211            return ToolServerType(raw_type)
212        except ValueError:
213            valid_types = ", ".join(type.value for type in ToolServerType)
214            raise ValueError(f"type must be one of: {valid_types}")
215
216    @model_validator(mode="before")
217    def validate_required_fields(cls, data: dict) -> dict:
218        """Validate that each tool type has the required configuration."""
219        server_type = ExternalToolServer.type_from_data(data)
220        properties = data.get("properties", {})
221
222        match server_type:
223            case ToolServerType.remote_mcp:
224                server_url = properties.get("server_url", None)
225                if server_url is None:
226                    raise ValueError(
227                        "Server URL is required to connect to a remote MCP server"
228                    )
229                ExternalToolServer.check_server_url(server_url)
230
231            case ToolServerType.local_mcp:
232                command = properties.get("command", None)
233                if command is None:
234                    raise ValueError("command is required to start a local MCP server")
235                if not isinstance(command, str):
236                    raise ValueError(
237                        "command must be a string to start a local MCP server"
238                    )
239                # Reject empty/whitespace-only command strings
240                if command.strip() == "":
241                    raise ValueError("command must be a non-empty string")
242
243                args = properties.get("args", None)
244                if args is not None:
245                    if not isinstance(args, list):
246                        raise ValueError(
247                            "arguments must be a list to start a local MCP server"
248                        )
249
250            case ToolServerType.kiln_task:
251                tool_name_validator(properties.get("name", ""))
252                err_msg_prefix = "Kiln task server properties:"
253                validate_return_dict_prop(
254                    properties, "description", str, err_msg_prefix
255                )
256                description = properties.get("description", "")
257                if len(description) > 128:
258                    raise ValueError("description must be 128 characters or less")
259                validate_return_dict_prop(
260                    properties, "is_archived", bool, err_msg_prefix
261                )
262                validate_return_dict_prop(properties, "task_id", str, err_msg_prefix)
263                validate_return_dict_prop(
264                    properties, "run_config_id", str, err_msg_prefix
265                )
266
267            case _:
268                # Type checking will catch missing cases
269                raise_exhaustive_enum_error(server_type)
270        return data
271
272    @model_validator(mode="before")
273    def validate_headers_and_env_vars(cls, data: dict) -> dict:
274        """
275        Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
276        """
277        type = ExternalToolServer.type_from_data(data)
278
279        properties = data.get("properties", {})
280        if properties is None:
281            raise ValueError("properties is required")
282
283        match type:
284            case ToolServerType.remote_mcp:
285                # Validate headers
286                headers = properties.get("headers", None)
287                if headers is not None:
288                    ExternalToolServer.check_headers(headers)
289
290                # Secret header keys are optional, validate if they are set
291                secret_header_keys = properties.get("secret_header_keys", None)
292                if secret_header_keys is not None:
293                    ExternalToolServer.check_secret_keys(
294                        secret_header_keys, "secret_header_keys", "remote_mcp"
295                    )
296
297            case ToolServerType.local_mcp:
298                # Validate secret environment variable keys
299                env_vars = properties.get("env_vars", {})
300                if env_vars is not None:
301                    ExternalToolServer.check_env_vars(env_vars)
302
303                # Secret env var keys are optional, but if they are set, they must be a list of strings
304                secret_env_var_keys = properties.get("secret_env_var_keys", None)
305                if secret_env_var_keys is not None:
306                    ExternalToolServer.check_secret_keys(
307                        secret_env_var_keys, "secret_env_var_keys", "local_mcp"
308                    )
309
310            case ToolServerType.kiln_task:
311                pass
312
313            case _:
314                raise_exhaustive_enum_error(type)
315
316        return data
317
318    def get_secret_keys(self) -> list[str]:
319        """
320        Get the list of secret key names based on server type.
321
322        Returns:
323            List of secret key names (header names for remote, env var names for local)
324        """
325        match self.type:
326            case ToolServerType.remote_mcp:
327                return self.properties.get("secret_header_keys", [])
328            case ToolServerType.local_mcp:
329                return self.properties.get("secret_env_var_keys", [])
330            case ToolServerType.kiln_task:
331                return []
332            case _:
333                raise_exhaustive_enum_error(self.type)
334
335    def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
336        """
337        Retrieve secrets from configuration system or in-memory storage.
338        Automatically determines which secret keys to retrieve based on the server type.
339        Config secrets take precedence over unsaved secrets.
340
341        Returns:
342            Tuple of (secrets_dict, missing_secrets_list) where:
343            - secrets_dict: Dictionary mapping key names to their secret values
344            - missing_secrets_list: List of secret key names that are missing values
345        """
346        secrets = {}
347        missing_secrets = []
348        secret_keys = self.get_secret_keys()
349
350        if secret_keys and len(secret_keys) > 0:
351            config = Config.shared()
352            mcp_secrets = config.get_value(MCP_SECRETS_KEY)
353
354            for key_name in secret_keys:
355                secret_value = None
356
357                # First check config secrets (persistent storage), key is mcp_server_id::key_name
358                secret_key = self._config_secret_key(key_name)
359                secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None
360
361                # Fall back to unsaved secrets (in-memory storage)
362                if (
363                    not secret_value
364                    and hasattr(self, "_unsaved_secrets")
365                    and key_name in self._unsaved_secrets
366                ):
367                    secret_value = self._unsaved_secrets[key_name]
368
369                if secret_value:
370                    secrets[key_name] = secret_value
371                else:
372                    missing_secrets.append(key_name)
373
374        return secrets, missing_secrets
375
376    def _save_secrets(self) -> None:
377        """
378        Save unsaved secrets to the configuration system.
379        """
380        secret_keys = self.get_secret_keys()
381
382        # No secrets to save
383        if not secret_keys:
384            return
385
386        if self.id is None:
387            raise ValueError("Server ID cannot be None when saving secrets")
388
389        # Check if secrets are already saved
390        if not hasattr(self, "_unsaved_secrets") or not self._unsaved_secrets:
391            return
392
393        config = Config.shared()
394        mcp_secrets: dict[str, str] = config.get_value(MCP_SECRETS_KEY) or {}
395
396        # Store secrets with the pattern: mcp_server_id::key_name
397        for key_name, secret_value in self._unsaved_secrets.items():
398            secret_key = self._config_secret_key(key_name)
399            mcp_secrets[secret_key] = secret_value
400
401        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
402
403        # Clear unsaved secrets after saving
404        self._unsaved_secrets.clear()
405
406    def delete_secrets(self) -> None:
407        """
408        Delete all secrets for this tool server from the configuration system.
409        """
410        secret_keys = self.get_secret_keys()
411
412        config = Config.shared()
413        mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]()
414
415        # Remove secrets with the pattern: mcp_server_id::key_name
416        for key_name in secret_keys:
417            secret_key = self._config_secret_key(key_name)
418            if secret_key in mcp_secrets:
419                del mcp_secrets[secret_key]
420
421        # Always call update_settings to maintain consistency with the old behavior
422        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
423
424    def save_to_file(self) -> None:
425        """
426        Override save_to_file to automatically save any unsaved secrets before saving to file.
427
428        This ensures that secrets are always saved when the object is saved,
429        preventing the issue where secrets could be lost if save_to_file is called
430        without explicitly saving secrets first.
431        """
432        # Save any unsaved secrets first
433        if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets:
434            self._save_secrets()
435
436        # Call the parent save_to_file method
437        super().save_to_file()
438
439    #  Internal helpers
440
441    def _config_secret_key(self, key_name: str) -> str:
442        """
443        Generate the secret key pattern for storing/retrieving secrets.
444
445        Args:
446            key_name: The name of the secret key
447
448        Returns:
449            The formatted secret key: "{server_id}::{key_name}"
450        """
451        return f"{self.id}::{key_name}"

Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.

This model stores the necessary configuration to connect to and authenticate with external MCP servers that provide tools for LLM interactions.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
type: kiln_ai.datamodel.external_tool_server.ToolServerType
description: str | None
properties: kiln_ai.datamodel.external_tool_server.LocalServerProperties | kiln_ai.datamodel.external_tool_server.RemoteServerProperties | kiln_ai.datamodel.external_tool_server.KilnTaskServerProperties
def model_post_init(self, _ExternalToolServer__context: Any) -> None:
76    def model_post_init(self, __context: Any) -> None:
77        # Process secrets after initialization (pydantic v2 hook)
78        self._process_secrets_from_properties()

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

@classmethod
def check_server_url(cls, server_url: str) -> None:
130    @classmethod
131    def check_server_url(cls, server_url: str) -> None:
132        """Validate Server URL"""
133        if not isinstance(server_url, str):
134            raise ValueError("Server URL must be a string")
135
136        # Check for leading whitespace in URL
137        if server_url != server_url.lstrip():
138            raise ValueError("Server URL must not have leading whitespace")
139
140        parsed_url = urlparse(server_url)
141        if not parsed_url.netloc:
142            raise ValueError("Server URL is not a valid URL")
143        if parsed_url.scheme not in ["http", "https"]:
144            raise ValueError("Server URL must start with http:// or https://")

Validate Server URL

@classmethod
def check_headers(cls, headers: dict) -> None:
146    @classmethod
147    def check_headers(cls, headers: dict) -> None:
148        """Validate Headers"""
149        if not isinstance(headers, dict):
150            raise ValueError("headers must be a dictionary")
151
152        for key, value in headers.items():
153            if not key:
154                raise ValueError("Header name is required")
155            if not value:
156                raise ValueError("Header value is required")
157
158            # Reject invalid header names and CR/LF in names/values
159            token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$")
160            if not token_re.match(key):
161                raise ValueError(f'Invalid header name: "{key}"')
162            if re.search(r"\r|\n", key) or re.search(r"\r|\n", value):
163                raise ValueError(
164                    "Header names/values must not contain invalid characters"
165                )

Validate Headers

@classmethod
def check_secret_keys(cls, secret_keys: list, key_type: str, tool_type: str) -> None:
167    @classmethod
168    def check_secret_keys(
169        cls, secret_keys: list, key_type: str, tool_type: str
170    ) -> None:
171        """Validate Secret Keys (generic method for both header and env var keys)"""
172        if not isinstance(secret_keys, list):
173            raise ValueError(
174                f"{key_type} must be a list for external tools of type '{tool_type}'"
175            )
176        if not all(isinstance(k, str) for k in secret_keys):
177            raise ValueError(f"{key_type} must contain only strings")
178        if not all(key for key in secret_keys):
179            raise ValueError("Secret key is required")

Validate Secret Keys (generic method for both header and env var keys)

@classmethod
def check_env_vars(cls, env_vars: dict) -> None:
181    @classmethod
182    def check_env_vars(cls, env_vars: dict) -> None:
183        """Validate Environment Variables"""
184        if not isinstance(env_vars, dict):
185            raise ValueError("environment variables must be a dictionary")
186
187        # Validate env_vars keys are in the correct format for Environment Variables
188        # According to POSIX specification, environment variable names must:
189        # - Start with a letter (a-z, A-Z) or underscore (_)
190        # - Contain only ASCII letters, digits, and underscores
191        for key, _ in env_vars.items():
192            if not key or not (
193                key[0].isascii() and (key[0].isalpha() or key[0] == "_")
194            ):
195                raise ValueError(
196                    f"Invalid environment variable key: {key}. Must start with a letter or underscore."
197                )
198
199            if not all(c.isascii() and (c.isalnum() or c == "_") for c in key):
200                raise ValueError(
201                    f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores."
202                )

Validate Environment Variables

@classmethod
def type_from_data(cls, data: dict) -> kiln_ai.datamodel.external_tool_server.ToolServerType:
204    @classmethod
205    def type_from_data(cls, data: dict) -> ToolServerType:
206        """Get the tool server type from the data for the the validators"""
207        raw_type = data.get("type")
208        if raw_type is None:
209            raise ValueError("type is required")
210        try:
211            return ToolServerType(raw_type)
212        except ValueError:
213            valid_types = ", ".join(type.value for type in ToolServerType)
214            raise ValueError(f"type must be one of: {valid_types}")

Get the tool server type from the data for the the validators

@model_validator(mode='before')
def validate_required_fields(cls, data: dict) -> dict:
216    @model_validator(mode="before")
217    def validate_required_fields(cls, data: dict) -> dict:
218        """Validate that each tool type has the required configuration."""
219        server_type = ExternalToolServer.type_from_data(data)
220        properties = data.get("properties", {})
221
222        match server_type:
223            case ToolServerType.remote_mcp:
224                server_url = properties.get("server_url", None)
225                if server_url is None:
226                    raise ValueError(
227                        "Server URL is required to connect to a remote MCP server"
228                    )
229                ExternalToolServer.check_server_url(server_url)
230
231            case ToolServerType.local_mcp:
232                command = properties.get("command", None)
233                if command is None:
234                    raise ValueError("command is required to start a local MCP server")
235                if not isinstance(command, str):
236                    raise ValueError(
237                        "command must be a string to start a local MCP server"
238                    )
239                # Reject empty/whitespace-only command strings
240                if command.strip() == "":
241                    raise ValueError("command must be a non-empty string")
242
243                args = properties.get("args", None)
244                if args is not None:
245                    if not isinstance(args, list):
246                        raise ValueError(
247                            "arguments must be a list to start a local MCP server"
248                        )
249
250            case ToolServerType.kiln_task:
251                tool_name_validator(properties.get("name", ""))
252                err_msg_prefix = "Kiln task server properties:"
253                validate_return_dict_prop(
254                    properties, "description", str, err_msg_prefix
255                )
256                description = properties.get("description", "")
257                if len(description) > 128:
258                    raise ValueError("description must be 128 characters or less")
259                validate_return_dict_prop(
260                    properties, "is_archived", bool, err_msg_prefix
261                )
262                validate_return_dict_prop(properties, "task_id", str, err_msg_prefix)
263                validate_return_dict_prop(
264                    properties, "run_config_id", str, err_msg_prefix
265                )
266
267            case _:
268                # Type checking will catch missing cases
269                raise_exhaustive_enum_error(server_type)
270        return data

Validate that each tool type has the required configuration.

@model_validator(mode='before')
def validate_headers_and_env_vars(cls, data: dict) -> dict:
272    @model_validator(mode="before")
273    def validate_headers_and_env_vars(cls, data: dict) -> dict:
274        """
275        Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
276        """
277        type = ExternalToolServer.type_from_data(data)
278
279        properties = data.get("properties", {})
280        if properties is None:
281            raise ValueError("properties is required")
282
283        match type:
284            case ToolServerType.remote_mcp:
285                # Validate headers
286                headers = properties.get("headers", None)
287                if headers is not None:
288                    ExternalToolServer.check_headers(headers)
289
290                # Secret header keys are optional, validate if they are set
291                secret_header_keys = properties.get("secret_header_keys", None)
292                if secret_header_keys is not None:
293                    ExternalToolServer.check_secret_keys(
294                        secret_header_keys, "secret_header_keys", "remote_mcp"
295                    )
296
297            case ToolServerType.local_mcp:
298                # Validate secret environment variable keys
299                env_vars = properties.get("env_vars", {})
300                if env_vars is not None:
301                    ExternalToolServer.check_env_vars(env_vars)
302
303                # Secret env var keys are optional, but if they are set, they must be a list of strings
304                secret_env_var_keys = properties.get("secret_env_var_keys", None)
305                if secret_env_var_keys is not None:
306                    ExternalToolServer.check_secret_keys(
307                        secret_env_var_keys, "secret_env_var_keys", "local_mcp"
308                    )
309
310            case ToolServerType.kiln_task:
311                pass
312
313            case _:
314                raise_exhaustive_enum_error(type)
315
316        return data

Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped

def get_secret_keys(self) -> list[str]:
318    def get_secret_keys(self) -> list[str]:
319        """
320        Get the list of secret key names based on server type.
321
322        Returns:
323            List of secret key names (header names for remote, env var names for local)
324        """
325        match self.type:
326            case ToolServerType.remote_mcp:
327                return self.properties.get("secret_header_keys", [])
328            case ToolServerType.local_mcp:
329                return self.properties.get("secret_env_var_keys", [])
330            case ToolServerType.kiln_task:
331                return []
332            case _:
333                raise_exhaustive_enum_error(self.type)

Get the list of secret key names based on server type.

Returns: List of secret key names (header names for remote, env var names for local)

def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
335    def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
336        """
337        Retrieve secrets from configuration system or in-memory storage.
338        Automatically determines which secret keys to retrieve based on the server type.
339        Config secrets take precedence over unsaved secrets.
340
341        Returns:
342            Tuple of (secrets_dict, missing_secrets_list) where:
343            - secrets_dict: Dictionary mapping key names to their secret values
344            - missing_secrets_list: List of secret key names that are missing values
345        """
346        secrets = {}
347        missing_secrets = []
348        secret_keys = self.get_secret_keys()
349
350        if secret_keys and len(secret_keys) > 0:
351            config = Config.shared()
352            mcp_secrets = config.get_value(MCP_SECRETS_KEY)
353
354            for key_name in secret_keys:
355                secret_value = None
356
357                # First check config secrets (persistent storage), key is mcp_server_id::key_name
358                secret_key = self._config_secret_key(key_name)
359                secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None
360
361                # Fall back to unsaved secrets (in-memory storage)
362                if (
363                    not secret_value
364                    and hasattr(self, "_unsaved_secrets")
365                    and key_name in self._unsaved_secrets
366                ):
367                    secret_value = self._unsaved_secrets[key_name]
368
369                if secret_value:
370                    secrets[key_name] = secret_value
371                else:
372                    missing_secrets.append(key_name)
373
374        return secrets, missing_secrets

Retrieve secrets from configuration system or in-memory storage. Automatically determines which secret keys to retrieve based on the server type. Config secrets take precedence over unsaved secrets.

Returns: Tuple of (secrets_dict, missing_secrets_list) where: - secrets_dict: Dictionary mapping key names to their secret values - missing_secrets_list: List of secret key names that are missing values

def delete_secrets(self) -> None:
406    def delete_secrets(self) -> None:
407        """
408        Delete all secrets for this tool server from the configuration system.
409        """
410        secret_keys = self.get_secret_keys()
411
412        config = Config.shared()
413        mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]()
414
415        # Remove secrets with the pattern: mcp_server_id::key_name
416        for key_name in secret_keys:
417            secret_key = self._config_secret_key(key_name)
418            if secret_key in mcp_secrets:
419                del mcp_secrets[secret_key]
420
421        # Always call update_settings to maintain consistency with the old behavior
422        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})

Delete all secrets for this tool server from the configuration system.

def save_to_file(self) -> None:
424    def save_to_file(self) -> None:
425        """
426        Override save_to_file to automatically save any unsaved secrets before saving to file.
427
428        This ensures that secrets are always saved when the object is saved,
429        preventing the issue where secrets could be lost if save_to_file is called
430        without explicitly saving secrets first.
431        """
432        # Save any unsaved secrets first
433        if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets:
434            self._save_secrets()
435
436        # Call the parent save_to_file method
437        super().save_to_file()

Override save_to_file to automatically save any unsaved secrets before saving to file.

This ensures that secrets are always saved when the object is saved, preventing the issue where secrets could be lost if save_to_file is called without explicitly saving secrets first.

def relationship_name() -> str:
661        def relationship_name_method() -> str:
662            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
654        def parent_class_method() -> Type[KilnParentModel]:
655            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class FineTuneStatusType(builtins.str, enum.Enum):
49class FineTuneStatusType(str, Enum):
50    """
51    The status type of a fine-tune (running, completed, failed, etc).
52    """
53
54    unknown = "unknown"  # server error
55    pending = "pending"
56    running = "running"
57    completed = "completed"
58    failed = "failed"

The status type of a fine-tune (running, completed, failed, etc).

unknown = <FineTuneStatusType.unknown: 'unknown'>
pending = <FineTuneStatusType.pending: 'pending'>
running = <FineTuneStatusType.running: 'running'>
completed = <FineTuneStatusType.completed: 'completed'>
failed = <FineTuneStatusType.failed: 'failed'>
class Finetune(kiln_ai.datamodel.basemodel.KilnParentedModel):
 23class Finetune(KilnParentedModel):
 24    """
 25    The Kiln fine-tune datamodel.
 26
 27    Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.
 28    """
 29
 30    name: FilenameString = Field(description="The name of the fine-tune.")
 31    description: str | None = Field(
 32        default=None,
 33        description="A description of the fine-tune for you and your team. Not used in training.",
 34    )
 35    structured_output_mode: StructuredOutputMode | None = Field(
 36        default=None,
 37        description="The mode to use to train the model for structured output, if it was trained with structured output. Will determine how we call the tuned model, so we call with the matching mode.",
 38    )
 39    provider: str = Field(
 40        description="The provider to use for the fine-tune (e.g. 'openai')."
 41    )
 42    base_model_id: str = Field(
 43        description="The id of the base model to use for the fine-tune. This string relates to the provider's IDs for their own models, not Kiln IDs."
 44    )
 45    provider_id: str | None = Field(
 46        default=None,
 47        description="The ID of the fine-tune job on the provider's side. May not be the same as the fine_tune_model_id.",
 48    )
 49    fine_tune_model_id: str | None = Field(
 50        default=None,
 51        description="The ID of the fine-tuned model on the provider's side. May not be the same as the provider_id.",
 52    )
 53    dataset_split_id: str = Field(
 54        description="The ID of the dataset split to use for this fine-tune.",
 55    )
 56    train_split_name: str = Field(
 57        default="train",
 58        description="The name of the training split to use for this fine-tune.",
 59    )
 60    validation_split_name: str | None = Field(
 61        default=None,
 62        description="The name of the validation split to use for this fine-tune. Optional.",
 63    )
 64    parameters: dict[str, str | int | float | bool] = Field(
 65        default={},
 66        description="The parameters to use for this fine-tune. These are provider-specific.",
 67    )
 68    # These two fields are saved exactly used for training. Even if they map exactly to a custom prompt or generator, those can change, so we want to keep a record of the training prompt.
 69    system_message: str = Field(
 70        description="The system message to use for this fine-tune.",
 71    )
 72    thinking_instructions: str | None = Field(
 73        default=None,
 74        description="The thinking instructions to use for this fine-tune. Only used when data_strategy is final_and_intermediate.",
 75    )
 76    latest_status: FineTuneStatusType = Field(
 77        default=FineTuneStatusType.unknown,
 78        description="The latest known status of this fine-tune. Not updated in real time.",
 79    )
 80    properties: Dict[str, str | int | float] = Field(
 81        default={},
 82        description="Properties of the fine-tune. Different providers may use different properties.",
 83    )
 84    data_strategy: ChatStrategy = Field(
 85        default=ChatStrategy.single_turn,
 86        description="The strategy to use for training the model. 'final_only' will only train on the final response. 'final_and_intermediate' will train on the final response and intermediate outputs (chain of thought or reasoning).",
 87    )
 88
 89    # Workaround to return typed parent without importing Task
 90    def parent_task(self) -> Union["Task", None]:
 91        if self.parent is None or self.parent.__class__.__name__ != "Task":
 92            return None
 93        return self.parent  # type: ignore
 94
 95    @model_validator(mode="after")
 96    def validate_thinking_instructions(self) -> Self:
 97        if (
 98            self.thinking_instructions is not None
 99            and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
100        ):
101            raise ValueError(
102                f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
103            )
104        if (
105            self.thinking_instructions is None
106            and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
107        ):
108            raise ValueError(
109                f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
110            )
111        return self

The Kiln fine-tune datamodel.

Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
description: str | None
structured_output_mode: StructuredOutputMode | None
provider: str
base_model_id: str
provider_id: str | None
fine_tune_model_id: str | None
dataset_split_id: str
train_split_name: str
validation_split_name: str | None
parameters: dict[str, str | int | float | bool]
system_message: str
thinking_instructions: str | None
latest_status: FineTuneStatusType
properties: Dict[str, str | int | float]
data_strategy: kiln_ai.datamodel.datamodel_enums.ChatStrategy
def parent_task(self) -> Optional[Task]:
90    def parent_task(self) -> Union["Task", None]:
91        if self.parent is None or self.parent.__class__.__name__ != "Task":
92            return None
93        return self.parent  # type: ignore
@model_validator(mode='after')
def validate_thinking_instructions(self) -> Self:
 95    @model_validator(mode="after")
 96    def validate_thinking_instructions(self) -> Self:
 97        if (
 98            self.thinking_instructions is not None
 99            and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
100        ):
101            raise ValueError(
102                f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
103            )
104        if (
105            self.thinking_instructions is None
106            and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
107        ):
108            raise ValueError(
109                f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
110            )
111        return self
def relationship_name() -> str:
661        def relationship_name_method() -> str:
662            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
654        def parent_class_method() -> Type[KilnParentModel]:
655            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Priority(enum.IntEnum):
 5class Priority(IntEnum):
 6    """Defines priority levels for tasks and requirements, where P0 is highest priority."""
 7
 8    p0 = 0
 9    p1 = 1
10    p2 = 2
11    p3 = 3

Defines priority levels for tasks and requirements, where P0 is highest priority.

p0 = <Priority.p0: 0>
p1 = <Priority.p1: 1>
p2 = <Priority.p2: 2>
p3 = <Priority.p3: 3>
class Project(kiln_ai.datamodel.basemodel.KilnParentModel):
14class Project(
15    KilnParentModel,
16    parent_of={
17        "tasks": Task,
18        "documents": Document,
19        "extractor_configs": ExtractorConfig,
20        "chunker_configs": ChunkerConfig,
21        "embedding_configs": EmbeddingConfig,
22        "rag_configs": RagConfig,
23        "vector_store_configs": VectorStoreConfig,
24        "external_tool_servers": ExternalToolServer,
25    },
26):
27    """
28    A collection of related tasks.
29
30    Projects organize tasks into logical groups and provide high-level descriptions
31    of the overall goals.
32    """
33
34    name: FilenameString = Field(description="The name of the project.")
35    description: str | None = Field(
36        default=None,
37        description="A description of the project for you and your team. Will not be used in prompts/training/validation.",
38    )
39
40    # Needed for typechecking. We should fix this in KilnParentModel
41    def tasks(self) -> list[Task]:
42        return super().tasks()  # type: ignore
43
44    def documents(self, readonly: bool = False) -> list[Document]:
45        return super().documents(readonly=readonly)  # type: ignore
46
47    def extractor_configs(self, readonly: bool = False) -> list[ExtractorConfig]:
48        return super().extractor_configs(readonly=readonly)  # type: ignore
49
50    def chunker_configs(self, readonly: bool = False) -> list[ChunkerConfig]:
51        return super().chunker_configs(readonly=readonly)  # type: ignore
52
53    def embedding_configs(self, readonly: bool = False) -> list[EmbeddingConfig]:
54        return super().embedding_configs(readonly=readonly)  # type: ignore
55
56    def vector_store_configs(self, readonly: bool = False) -> list[VectorStoreConfig]:
57        return super().vector_store_configs(readonly=readonly)  # type: ignore
58
59    def rag_configs(self, readonly: bool = False) -> list[RagConfig]:
60        return super().rag_configs(readonly=readonly)  # type: ignore
61
62    def external_tool_servers(self, readonly: bool = False) -> list[ExternalToolServer]:
63        return super().external_tool_servers(readonly=readonly)  # type: ignore

A collection of related tasks.

Projects organize tasks into logical groups and provide high-level descriptions of the overall goals.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
description: str | None
def tasks(self, readonly=False) -> List[Task]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def documents(self, readonly=False) -> List[kiln_ai.datamodel.extraction.Document]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def extractor_configs( self, readonly=False) -> List[kiln_ai.datamodel.extraction.ExtractorConfig]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def chunker_configs(self, readonly=False) -> List[kiln_ai.datamodel.chunk.ChunkerConfig]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def embedding_configs( self, readonly=False) -> List[kiln_ai.datamodel.embedding.EmbeddingConfig]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def vector_store_configs( self, readonly=False) -> List[kiln_ai.datamodel.vector_store.VectorStoreConfig]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def rag_configs(self, readonly=False) -> List[kiln_ai.datamodel.rag.RagConfig]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def external_tool_servers( self, readonly=False) -> List[ExternalToolServer]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Prompt(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.BasePrompt):
33class Prompt(KilnParentedModel, BasePrompt):
34    """
35    A prompt for a task. This is the custom prompt parented by a task.
36    """
37
38    pass

A prompt for a task. This is the custom prompt parented by a task.

def relationship_name() -> str:
661        def relationship_name_method() -> str:
662            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
654        def parent_class_method() -> Type[KilnParentModel]:
655            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class PromptGenerators(builtins.str, enum.Enum):
 9class PromptGenerators(str, Enum):
10    SIMPLE = "simple_prompt_builder"
11    MULTI_SHOT = "multi_shot_prompt_builder"
12    FEW_SHOT = "few_shot_prompt_builder"
13    REPAIRS = "repairs_prompt_builder"
14    SIMPLE_CHAIN_OF_THOUGHT = "simple_chain_of_thought_prompt_builder"
15    FEW_SHOT_CHAIN_OF_THOUGHT = "few_shot_chain_of_thought_prompt_builder"
16    MULTI_SHOT_CHAIN_OF_THOUGHT = "multi_shot_chain_of_thought_prompt_builder"
17    SHORT = "short_prompt_builder"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

SIMPLE = <PromptGenerators.SIMPLE: 'simple_prompt_builder'>
MULTI_SHOT = <PromptGenerators.MULTI_SHOT: 'multi_shot_prompt_builder'>
FEW_SHOT = <PromptGenerators.FEW_SHOT: 'few_shot_prompt_builder'>
REPAIRS = <PromptGenerators.REPAIRS: 'repairs_prompt_builder'>
SIMPLE_CHAIN_OF_THOUGHT = <PromptGenerators.SIMPLE_CHAIN_OF_THOUGHT: 'simple_chain_of_thought_prompt_builder'>
FEW_SHOT_CHAIN_OF_THOUGHT = <PromptGenerators.FEW_SHOT_CHAIN_OF_THOUGHT: 'few_shot_chain_of_thought_prompt_builder'>
MULTI_SHOT_CHAIN_OF_THOUGHT = <PromptGenerators.MULTI_SHOT_CHAIN_OF_THOUGHT: 'multi_shot_chain_of_thought_prompt_builder'>
SHORT = <PromptGenerators.SHORT: 'short_prompt_builder'>
PromptId = typing.Annotated[str, AfterValidator(func=<function <lambda>>)]
class RequirementRating(pydantic.main.BaseModel):
20class RequirementRating(BaseModel):
21    """Rating for a specific requirement within a task output."""
22
23    value: float = Field(
24        description="The rating value. Interpretation depends on rating type"
25    )
26    type: TaskOutputRatingType = Field(description="The type of rating")

Rating for a specific requirement within a task output.

value: float
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class StructuredOutputMode(builtins.str, enum.Enum):
24class StructuredOutputMode(str, Enum):
25    """
26    Enumeration of supported structured output modes.
27
28    - json_schema: request json using API capabilities for json_schema
29    - function_calling: request json using API capabilities for function calling
30    - json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
31    - json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
32    - json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
33    - json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
34    - default: let the adapter decide (legacy, do not use for new use cases)
35    - unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
36    """
37
38    default = "default"
39    json_schema = "json_schema"
40    function_calling_weak = "function_calling_weak"
41    function_calling = "function_calling"
42    json_mode = "json_mode"
43    json_instructions = "json_instructions"
44    json_instruction_and_object = "json_instruction_and_object"
45    json_custom_instructions = "json_custom_instructions"
46    unknown = "unknown"

Enumeration of supported structured output modes.

  • json_schema: request json using API capabilities for json_schema
  • function_calling: request json using API capabilities for function calling
  • json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
  • json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
  • json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
  • json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
  • default: let the adapter decide (legacy, do not use for new use cases)
  • unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
default = <StructuredOutputMode.default: 'default'>
json_schema = <StructuredOutputMode.json_schema: 'json_schema'>
function_calling_weak = <StructuredOutputMode.function_calling_weak: 'function_calling_weak'>
function_calling = <StructuredOutputMode.function_calling: 'function_calling'>
json_mode = <StructuredOutputMode.json_mode: 'json_mode'>
json_instructions = <StructuredOutputMode.json_instructions: 'json_instructions'>
json_instruction_and_object = <StructuredOutputMode.json_instruction_and_object: 'json_instruction_and_object'>
json_custom_instructions = <StructuredOutputMode.json_custom_instructions: 'json_custom_instructions'>
unknown = <StructuredOutputMode.unknown: 'unknown'>
class Task(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.basemodel.KilnParentModel):
 99class Task(
100    KilnParentedModel,
101    KilnParentModel,
102    parent_of={
103        "runs": TaskRun,
104        "dataset_splits": DatasetSplit,
105        "finetunes": Finetune,
106        "prompts": Prompt,
107        "evals": Eval,
108        "run_configs": TaskRunConfig,
109    },
110):
111    """
112    Represents a specific task to be performed, with associated requirements and validation rules.
113
114    Contains the task definition, requirements, input/output schemas, and maintains
115    a collection of task runs.
116    """
117
118    name: FilenameString = Field(description="The name of the task.")
119    description: str | None = Field(
120        default=None,
121        description="A description of the task for you and your team. Will not be used in prompts/training/validation.",
122    )
123    instruction: str = Field(
124        min_length=1,
125        description="The instructions for the task. Will be used in prompts/training/validation.",
126    )
127    requirements: List[TaskRequirement] = Field(default=[])
128    output_json_schema: JsonObjectSchema | None = None
129    input_json_schema: JsonObjectSchema | None = None
130    thinking_instruction: str | None = Field(
131        default=None,
132        description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting.",
133    )
134
135    default_run_config_id: ID_TYPE | None = Field(
136        default=None,
137        description="ID of the run config to use for this task by default. Must exist in saved run configs for this task.",
138    )
139
140    def output_schema(self) -> Dict | None:
141        if self.output_json_schema is None:
142            return None
143        return schema_from_json_str(self.output_json_schema)
144
145    def input_schema(self) -> Dict | None:
146        if self.input_json_schema is None:
147            return None
148        return schema_from_json_str(self.input_json_schema)
149
150    # These wrappers help for typechecking. We should fix this in KilnParentModel
151    def runs(self, readonly: bool = False) -> list[TaskRun]:
152        return super().runs(readonly=readonly)  # type: ignore
153
154    def dataset_splits(self, readonly: bool = False) -> list[DatasetSplit]:
155        return super().dataset_splits(readonly=readonly)  # type: ignore
156
157    def finetunes(self, readonly: bool = False) -> list[Finetune]:
158        return super().finetunes(readonly=readonly)  # type: ignore
159
160    def prompts(self, readonly: bool = False) -> list[Prompt]:
161        return super().prompts(readonly=readonly)  # type: ignore
162
163    def evals(self, readonly: bool = False) -> list[Eval]:
164        return super().evals(readonly=readonly)  # type: ignore
165
166    def run_configs(self, readonly: bool = False) -> list[TaskRunConfig]:
167        return super().run_configs(readonly=readonly)  # type: ignore
168
169    # Workaround to return typed parent without importing Task
170    def parent_project(self) -> Union["Project", None]:
171        if self.parent is None or self.parent.__class__.__name__ != "Project":
172            return None
173        return self.parent  # type: ignore

Represents a specific task to be performed, with associated requirements and validation rules.

Contains the task definition, requirements, input/output schemas, and maintains a collection of task runs.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f0900>, json_schema_input_type=PydanticUndefined)]
description: str | None
instruction: str
requirements: List[TaskRequirement]
output_json_schema: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7fc0764280e0>)]]
input_json_schema: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7fc0764280e0>)]]
thinking_instruction: str | None
default_run_config_id: Optional[str]
def output_schema(self) -> Optional[Dict]:
140    def output_schema(self) -> Dict | None:
141        if self.output_json_schema is None:
142            return None
143        return schema_from_json_str(self.output_json_schema)
def input_schema(self) -> Optional[Dict]:
145    def input_schema(self) -> Dict | None:
146        if self.input_json_schema is None:
147            return None
148        return schema_from_json_str(self.input_json_schema)
def runs(self, readonly=False) -> List[TaskRun]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def dataset_splits( self, readonly=False) -> List[DatasetSplit]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def finetunes(self, readonly=False) -> List[Finetune]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def prompts(self, readonly=False) -> List[Prompt]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def evals(self, readonly=False) -> List[kiln_ai.datamodel.eval.Eval]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def run_configs(self, readonly=False) -> List[kiln_ai.datamodel.task.TaskRunConfig]:
643        def child_method(self, readonly: bool = False) -> list[child_class]:
644            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def parent_project(self) -> Optional[Project]:
170    def parent_project(self) -> Union["Project", None]:
171        if self.parent is None or self.parent.__class__.__name__ != "Project":
172            return None
173        return self.parent  # type: ignore
def relationship_name() -> str:
661        def relationship_name_method() -> str:
662            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
654        def parent_class_method() -> Type[KilnParentModel]:
655            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutput(kiln_ai.datamodel.basemodel.KilnBaseModel):
322class TaskOutput(KilnBaseModel):
323    """
324    An output for a specific task run.
325
326    Contains the actual output content, its source (human or synthetic),
327    and optional rating information.
328    """
329
330    output: str = Field(
331        description="The output of the task. JSON formatted for structured output, plaintext for unstructured output."
332    )
333    source: DataSource | None = Field(
334        description="The source of the output: human or synthetic.",
335        default=None,
336    )
337    rating: TaskOutputRating | None = Field(
338        default=None, description="The rating of the output"
339    )
340
341    def validate_output_format(self, task: "Task") -> Self:
342        # validate output
343        if task.output_json_schema is not None:
344            try:
345                output_parsed = json.loads(self.output)
346            except json.JSONDecodeError:
347                raise ValueError("Output is not a valid JSON object")
348
349            validate_schema_with_value_error(
350                output_parsed,
351                task.output_json_schema,
352                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
353            )
354        return self
355
356    @model_validator(mode="after")
357    def validate_output_source(self, info: ValidationInfo) -> Self:
358        # On strict mode and not loaded from file, we validate output_source is not None.
359        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
360        if not strict_mode():
361            return self
362        if self.loaded_from_file(info):
363            return self
364        if self.source is None:
365            raise ValueError("Output source is required when strict mode is enabled")
366        return self

An output for a specific task run.

Contains the actual output content, its source (human or synthetic), and optional rating information.

output: str
source: DataSource | None
rating: TaskOutputRating | None
def validate_output_format(self, task: Task) -> Self:
341    def validate_output_format(self, task: "Task") -> Self:
342        # validate output
343        if task.output_json_schema is not None:
344            try:
345                output_parsed = json.loads(self.output)
346            except json.JSONDecodeError:
347                raise ValueError("Output is not a valid JSON object")
348
349            validate_schema_with_value_error(
350                output_parsed,
351                task.output_json_schema,
352                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
353            )
354        return self
@model_validator(mode='after')
def validate_output_source(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
356    @model_validator(mode="after")
357    def validate_output_source(self, info: ValidationInfo) -> Self:
358        # On strict mode and not loaded from file, we validate output_source is not None.
359        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
360        if not strict_mode():
361            return self
362        if self.loaded_from_file(info):
363            return self
364        if self.source is None:
365            raise ValueError("Output source is required when strict mode is enabled")
366        return self
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutputRating(kiln_ai.datamodel.basemodel.KilnBaseModel):
 50class TaskOutputRating(KilnBaseModel):
 51    """
 52    A rating for a task output, including an overall rating and ratings for each requirement.
 53
 54    Supports:
 55    - five_star: 1-5 star ratings
 56    - pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
 57    - pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
 58    """
 59
 60    type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star)
 61    value: float | None = Field(
 62        description="The rating value. Interpretation depends on rating type:\n- five_star: 1-5 stars\n- pass_fail: 1.0 (pass) or 0.0 (fail)\n- pass_fail_critical: 1.0 (pass), 0.0 (fail), or -1.0 (critical fail)",
 63        default=None,
 64    )
 65    requirement_ratings: Dict[ID_TYPE, RequirementRating] = Field(
 66        default={},
 67        description="The ratings of the requirements of the task. The ID can be either a task_requirement_id or a named rating for an eval_output_score name (in format 'named::<name>').",
 68    )
 69
 70    # Previously we stored rating values as a dict of floats, but now we store them as RequirementRating objects.
 71    @model_validator(mode="before")
 72    def upgrade_old_format(cls, data: dict) -> dict:
 73        if not isinstance(data, dict):
 74            return data
 75
 76        # Check if we have the old format (dict of floats)
 77        req_ratings = data.get("requirement_ratings", {})
 78        if req_ratings and all(
 79            isinstance(v, (int, float)) for v in req_ratings.values()
 80        ):
 81            # Convert each float to a RequirementRating object
 82            # all ratings are five star at the point we used this format
 83            data["requirement_ratings"] = {
 84                k: {"value": v, "type": TaskOutputRatingType.five_star}
 85                for k, v in req_ratings.items()
 86            }
 87
 88        return data
 89
 90    # Used to select high quality outputs for example selection (MultiShotPromptBuilder, etc)
 91    def is_high_quality(self) -> bool:
 92        if self.value is None:
 93            return False
 94
 95        if self.type == TaskOutputRatingType.five_star:
 96            return self.value >= 4
 97        elif self.type == TaskOutputRatingType.pass_fail:
 98            return self.value == 1.0
 99        elif self.type == TaskOutputRatingType.pass_fail_critical:
100            return self.value == 1.0
101        return False
102
103    @model_validator(mode="after")
104    def validate_rating(self) -> Self:
105        if self.type not in TaskOutputRatingType:
106            raise ValueError(f"Invalid rating type: {self.type}")
107
108        # Overall rating is optional
109        if self.value is not None:
110            self._validate_rating(self.type, self.value, "overall rating")
111
112        for req_id, req_rating in self.requirement_ratings.items():
113            self._validate_rating(
114                req_rating.type,
115                req_rating.value,
116                f"requirement rating for req ID: {req_id}",
117            )
118
119        return self
120
121    def _validate_rating(
122        self, type: TaskOutputRatingType, rating: float | None, rating_name: str
123    ) -> None:
124        if type == TaskOutputRatingType.five_star:
125            self._validate_five_star(rating, rating_name)
126        elif type == TaskOutputRatingType.pass_fail:
127            self._validate_pass_fail(rating, rating_name)
128        elif type == TaskOutputRatingType.pass_fail_critical:
129            self._validate_pass_fail_critical(rating, rating_name)
130
131    def _validate_five_star(self, rating: float | None, rating_name: str) -> None:
132        if rating is None or not isinstance(rating, float) or not rating.is_integer():
133            raise ValueError(
134                f"{rating_name.capitalize()} of type five_star must be an integer value (1-5)"
135            )
136        if rating < 1 or rating > 5:
137            raise ValueError(
138                f"{rating_name.capitalize()} of type five_star must be between 1 and 5 stars"
139            )
140
141    def _validate_pass_fail(self, rating: float | None, rating_name: str) -> None:
142        if rating is None or not isinstance(rating, float) or not rating.is_integer():
143            raise ValueError(
144                f"{rating_name.capitalize()} of type pass_fail must be an integer value (0 or 1)"
145            )
146        if rating not in [0, 1]:
147            raise ValueError(
148                f"{rating_name.capitalize()} of type pass_fail must be 0 (fail) or 1 (pass)"
149            )
150
151    def _validate_pass_fail_critical(
152        self, rating: float | None, rating_name: str
153    ) -> None:
154        if rating is None or not isinstance(rating, float) or not rating.is_integer():
155            raise ValueError(
156                f"{rating_name.capitalize()} of type pass_fail_critical must be an integer value (-1, 0, or 1)"
157            )
158        if rating not in [-1, 0, 1]:
159            raise ValueError(
160                f"{rating_name.capitalize()} of type pass_fail_critical must be -1 (critical fail), 0 (fail), or 1 (pass)"
161            )

A rating for a task output, including an overall rating and ratings for each requirement.

Supports:

  • five_star: 1-5 star ratings
  • pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
  • pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
value: float | None
requirement_ratings: Dict[Optional[str], RequirementRating]
@model_validator(mode='before')
def upgrade_old_format(cls, data: dict) -> dict:
71    @model_validator(mode="before")
72    def upgrade_old_format(cls, data: dict) -> dict:
73        if not isinstance(data, dict):
74            return data
75
76        # Check if we have the old format (dict of floats)
77        req_ratings = data.get("requirement_ratings", {})
78        if req_ratings and all(
79            isinstance(v, (int, float)) for v in req_ratings.values()
80        ):
81            # Convert each float to a RequirementRating object
82            # all ratings are five star at the point we used this format
83            data["requirement_ratings"] = {
84                k: {"value": v, "type": TaskOutputRatingType.five_star}
85                for k, v in req_ratings.items()
86            }
87
88        return data
def is_high_quality(self) -> bool:
 91    def is_high_quality(self) -> bool:
 92        if self.value is None:
 93            return False
 94
 95        if self.type == TaskOutputRatingType.five_star:
 96            return self.value >= 4
 97        elif self.type == TaskOutputRatingType.pass_fail:
 98            return self.value == 1.0
 99        elif self.type == TaskOutputRatingType.pass_fail_critical:
100            return self.value == 1.0
101        return False
@model_validator(mode='after')
def validate_rating(self) -> Self:
103    @model_validator(mode="after")
104    def validate_rating(self) -> Self:
105        if self.type not in TaskOutputRatingType:
106            raise ValueError(f"Invalid rating type: {self.type}")
107
108        # Overall rating is optional
109        if self.value is not None:
110            self._validate_rating(self.type, self.value, "overall rating")
111
112        for req_id, req_rating in self.requirement_ratings.items():
113            self._validate_rating(
114                req_rating.type,
115                req_rating.value,
116                f"requirement rating for req ID: {req_id}",
117            )
118
119        return self
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutputRatingType(builtins.str, enum.Enum):
15class TaskOutputRatingType(str, Enum):
16    """Defines the types of rating systems available for task outputs."""
17
18    five_star = "five_star"
19    pass_fail = "pass_fail"
20    pass_fail_critical = "pass_fail_critical"
21    custom = "custom"

Defines the types of rating systems available for task outputs.

five_star = <TaskOutputRatingType.five_star: 'five_star'>
pass_fail = <TaskOutputRatingType.pass_fail: 'pass_fail'>
pass_fail_critical = <TaskOutputRatingType.pass_fail_critical: 'pass_fail_critical'>
custom = <TaskOutputRatingType.custom: 'custom'>
class TaskRequirement(pydantic.main.BaseModel):
31class TaskRequirement(BaseModel):
32    """
33    Defines a specific requirement that should be met by task outputs.
34
35    Includes an identifier, name, description, instruction for meeting the requirement,
36    priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).
37    """
38
39    id: ID_TYPE = ID_FIELD
40    name: FilenameStringShort = Field(description="The name of the task requirement.")
41    description: str | None = Field(default=None)
42    instruction: str = Field(min_length=1)
43    priority: Priority = Field(default=Priority.p2)
44    type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star)

Defines a specific requirement that should be met by task outputs.

Includes an identifier, name, description, instruction for meeting the requirement, priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).

id: Optional[str]
name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7fc0765f1b20>, json_schema_input_type=PydanticUndefined)]
description: str | None
instruction: str
priority: Priority
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class TaskRun(kiln_ai.datamodel.basemodel.KilnParentedModel):
 77class TaskRun(KilnParentedModel):
 78    """
 79    Represents a single execution of a Task.
 80
 81    Contains the input used, its source, the output produced, and optional
 82    repair information if the output needed correction.
 83    """
 84
 85    input: str = Field(
 86        description="The inputs to the task. JSON formatted for structured input, plaintext for unstructured input."
 87    )
 88    input_source: DataSource | None = Field(
 89        default=None, description="The source of the input: human or synthetic."
 90    )
 91
 92    output: TaskOutput = Field(description="The output of the task run.")
 93    repair_instructions: str | None = Field(
 94        default=None,
 95        description="Instructions for fixing the output. Should define what is wrong, and how to fix it. Will be used by models for both generating a fixed output, and evaluating future models.",
 96    )
 97    repaired_output: TaskOutput | None = Field(
 98        default=None,
 99        description="An version of the output with issues fixed. This must be a 'fixed' version of the existing output, and not an entirely new output. If you wish to generate an ideal curatorial output for this task unrelated to this output, generate a new TaskOutput with type 'human' instead of using this field.",
100    )
101    intermediate_outputs: Dict[str, str] | None = Field(
102        default=None,
103        description="Intermediate outputs from the task run. Keys are the names of the intermediate output steps (cot=chain of thought, etc), values are the output data.",
104    )
105    tags: List[str] = Field(
106        default=[],
107        description="Tags for the task run. Tags are used to categorize task runs for filtering and reporting.",
108    )
109    usage: Usage | None = Field(
110        default=None,
111        description="Usage information for the task run. This includes the number of input tokens, output tokens, and total tokens used.",
112    )
113    trace: list[ChatCompletionMessageParam] | None = Field(
114        default=None,
115        description="The trace of the task run in OpenAI format. This is the list of messages that were sent to/from the model.",
116    )
117
118    def thinking_training_data(self) -> str | None:
119        """
120        Get the thinking training data from the task run.
121        """
122        if self.intermediate_outputs is None:
123            return None
124        return self.intermediate_outputs.get(
125            "reasoning"
126        ) or self.intermediate_outputs.get("chain_of_thought")
127
128    def has_thinking_training_data(self) -> bool:
129        """
130        Does this run have thinking data that we can use to train a thinking model?
131        """
132        return self.thinking_training_data() is not None
133
134    # Workaround to return typed parent without importing Task
135    def parent_task(self) -> Union["Task", None]:
136        if self.parent is None or self.parent.__class__.__name__ != "Task":
137            return None
138        return self.parent  # type: ignore
139
140    @model_validator(mode="after")
141    def validate_input_format(self, info: ValidationInfo) -> Self:
142        # Don't validate if loading from file (not new). Too slow.
143        # We don't allow changing task schema, so this is redundant validation.
144        # Note: we still validate if editing a loaded model
145        if self.loading_from_file(info):
146            # Consider loading an existing model as validated.
147            self._last_validated_input = self.input
148            return self
149
150        # Don't validate if input has not changed. Too slow to run this every time.
151        if (
152            hasattr(self, "_last_validated_input")
153            and self.input == self._last_validated_input
154        ):
155            return self
156
157        task = self.parent_task()
158        if task is None:
159            # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving)
160            return self
161
162        # validate input
163        if task.input_json_schema is not None:
164            try:
165                input_parsed = json.loads(self.input)
166            except json.JSONDecodeError:
167                raise ValueError("Input is not a valid JSON object")
168
169            validate_schema_with_value_error(
170                input_parsed,
171                task.input_json_schema,
172                "Input does not match task input schema.",
173            )
174
175        self._last_validated_input = self.input
176        return self
177
178    @model_validator(mode="after")
179    def validate_output_format(self, info: ValidationInfo) -> Self:
180        # Don't validate if loading from file (not new). Too slow.
181        # Note: we still validate if editing a loaded model's output.
182        if self.loading_from_file(info):
183            # Consider loading an existing model as validated.
184            self._last_validated_output = self.output.output if self.output else None
185            return self
186
187        # Don't validate unless output has changed since last validation.
188        # The validator is slow and costly, don't want it running when setting other fields.
189        if (
190            hasattr(self, "_last_validated_output")
191            and self.output is not None
192            and self.output.output == self._last_validated_output
193        ):
194            return self
195
196        task = self.parent_task()
197        if task is None:
198            return self
199
200        self.output.validate_output_format(task)
201        self._last_validated_output = self.output.output if self.output else None
202        return self
203
204    @model_validator(mode="after")
205    def validate_repaired_output(self) -> Self:
206        if self.repaired_output is not None:
207            if self.repaired_output.rating is not None:
208                raise ValueError(
209                    "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed."
210                )
211
212            task = self.parent_task()
213            if (
214                task is not None
215                and self.repaired_output.output is not None
216                and task.output_json_schema is not None
217            ):
218                try:
219                    output_parsed = json.loads(self.repaired_output.output)
220                except json.JSONDecodeError:
221                    raise ValueError("Repaired output is not a valid JSON object")
222
223                validate_schema_with_value_error(
224                    output_parsed,
225                    task.output_json_schema,
226                    "Repaired output does not match task output schema.",
227                )
228
229        if self.repair_instructions is None and self.repaired_output is not None:
230            raise ValueError(
231                "Repair instructions are required if providing a repaired output."
232            )
233        if self.repair_instructions is not None and self.repaired_output is None:
234            raise ValueError(
235                "A repaired output is required if providing repair instructions."
236            )
237
238        return self
239
240    @model_validator(mode="after")
241    def validate_input_source(self, info: ValidationInfo) -> Self:
242        # On strict mode and not loaded from file, we validate input_source is not None.
243        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
244        if not strict_mode():
245            return self
246        if self.loaded_from_file(info):
247            return self
248        if self.input_source is None:
249            raise ValueError("input_source is required when strict mode is enabled")
250        return self
251
252    @model_validator(mode="after")
253    def validate_tags(self) -> Self:
254        for tag in self.tags:
255            if not tag:
256                raise ValueError("Tags cannot be empty strings")
257            if " " in tag:
258                raise ValueError("Tags cannot contain spaces. Try underscores.")
259
260        return self

Represents a single execution of a Task.

Contains the input used, its source, the output produced, and optional repair information if the output needed correction.

input: str
input_source: DataSource | None
output: TaskOutput
repair_instructions: str | None
repaired_output: TaskOutput | None
intermediate_outputs: Optional[Dict[str, str]]
tags: List[str]
usage: Usage | None
trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None
def thinking_training_data(self) -> str | None:
118    def thinking_training_data(self) -> str | None:
119        """
120        Get the thinking training data from the task run.
121        """
122        if self.intermediate_outputs is None:
123            return None
124        return self.intermediate_outputs.get(
125            "reasoning"
126        ) or self.intermediate_outputs.get("chain_of_thought")

Get the thinking training data from the task run.

def has_thinking_training_data(self) -> bool:
128    def has_thinking_training_data(self) -> bool:
129        """
130        Does this run have thinking data that we can use to train a thinking model?
131        """
132        return self.thinking_training_data() is not None

Does this run have thinking data that we can use to train a thinking model?

def parent_task(self) -> Optional[Task]:
135    def parent_task(self) -> Union["Task", None]:
136        if self.parent is None or self.parent.__class__.__name__ != "Task":
137            return None
138        return self.parent  # type: ignore
@model_validator(mode='after')
def validate_input_format(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
140    @model_validator(mode="after")
141    def validate_input_format(self, info: ValidationInfo) -> Self:
142        # Don't validate if loading from file (not new). Too slow.
143        # We don't allow changing task schema, so this is redundant validation.
144        # Note: we still validate if editing a loaded model
145        if self.loading_from_file(info):
146            # Consider loading an existing model as validated.
147            self._last_validated_input = self.input
148            return self
149
150        # Don't validate if input has not changed. Too slow to run this every time.
151        if (
152            hasattr(self, "_last_validated_input")
153            and self.input == self._last_validated_input
154        ):
155            return self
156
157        task = self.parent_task()
158        if task is None:
159            # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving)
160            return self
161
162        # validate input
163        if task.input_json_schema is not None:
164            try:
165                input_parsed = json.loads(self.input)
166            except json.JSONDecodeError:
167                raise ValueError("Input is not a valid JSON object")
168
169            validate_schema_with_value_error(
170                input_parsed,
171                task.input_json_schema,
172                "Input does not match task input schema.",
173            )
174
175        self._last_validated_input = self.input
176        return self
@model_validator(mode='after')
def validate_output_format(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
178    @model_validator(mode="after")
179    def validate_output_format(self, info: ValidationInfo) -> Self:
180        # Don't validate if loading from file (not new). Too slow.
181        # Note: we still validate if editing a loaded model's output.
182        if self.loading_from_file(info):
183            # Consider loading an existing model as validated.
184            self._last_validated_output = self.output.output if self.output else None
185            return self
186
187        # Don't validate unless output has changed since last validation.
188        # The validator is slow and costly, don't want it running when setting other fields.
189        if (
190            hasattr(self, "_last_validated_output")
191            and self.output is not None
192            and self.output.output == self._last_validated_output
193        ):
194            return self
195
196        task = self.parent_task()
197        if task is None:
198            return self
199
200        self.output.validate_output_format(task)
201        self._last_validated_output = self.output.output if self.output else None
202        return self
@model_validator(mode='after')
def validate_repaired_output(self) -> Self:
204    @model_validator(mode="after")
205    def validate_repaired_output(self) -> Self:
206        if self.repaired_output is not None:
207            if self.repaired_output.rating is not None:
208                raise ValueError(
209                    "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed."
210                )
211
212            task = self.parent_task()
213            if (
214                task is not None
215                and self.repaired_output.output is not None
216                and task.output_json_schema is not None
217            ):
218                try:
219                    output_parsed = json.loads(self.repaired_output.output)
220                except json.JSONDecodeError:
221                    raise ValueError("Repaired output is not a valid JSON object")
222
223                validate_schema_with_value_error(
224                    output_parsed,
225                    task.output_json_schema,
226                    "Repaired output does not match task output schema.",
227                )
228
229        if self.repair_instructions is None and self.repaired_output is not None:
230            raise ValueError(
231                "Repair instructions are required if providing a repaired output."
232            )
233        if self.repair_instructions is not None and self.repaired_output is None:
234            raise ValueError(
235                "A repaired output is required if providing repair instructions."
236            )
237
238        return self
@model_validator(mode='after')
def validate_input_source(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
240    @model_validator(mode="after")
241    def validate_input_source(self, info: ValidationInfo) -> Self:
242        # On strict mode and not loaded from file, we validate input_source is not None.
243        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
244        if not strict_mode():
245            return self
246        if self.loaded_from_file(info):
247            return self
248        if self.input_source is None:
249            raise ValueError("input_source is required when strict mode is enabled")
250        return self
@model_validator(mode='after')
def validate_tags(self) -> Self:
252    @model_validator(mode="after")
253    def validate_tags(self) -> Self:
254        for tag in self.tags:
255            if not tag:
256                raise ValueError("Tags cannot be empty strings")
257            if " " in tag:
258                raise ValueError("Tags cannot contain spaces. Try underscores.")
259
260        return self
def relationship_name() -> str:
661        def relationship_name_method() -> str:
662            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
654        def parent_class_method() -> Type[KilnParentModel]:
655            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Usage(pydantic.main.BaseModel):
18class Usage(BaseModel):
19    input_tokens: int | None = Field(
20        default=None,
21        description="The number of input tokens used in the task run.",
22        ge=0,
23    )
24    output_tokens: int | None = Field(
25        default=None,
26        description="The number of output tokens used in the task run.",
27        ge=0,
28    )
29    total_tokens: int | None = Field(
30        default=None,
31        description="The total number of tokens used in the task run.",
32        ge=0,
33    )
34    cost: float | None = Field(
35        default=None,
36        description="The cost of the task run in US dollars, saved at runtime (prices can change over time).",
37        ge=0,
38    )
39
40    def __add__(self, other: "Usage") -> "Usage":
41        """Add two Usage objects together, handling None values gracefully.
42
43        None + None = None
44        None + value = value
45        value + None = value
46        value1 + value2 = value1 + value2
47        """
48        if not isinstance(other, Usage):
49            raise TypeError(f"Cannot add Usage with {type(other).__name__}")
50
51        def _add_optional_int(a: int | None, b: int | None) -> int | None:
52            if a is None and b is None:
53                return None
54            if a is None:
55                return b
56            if b is None:
57                return a
58            return a + b
59
60        def _add_optional_float(a: float | None, b: float | None) -> float | None:
61            if a is None and b is None:
62                return None
63            if a is None:
64                return b
65            if b is None:
66                return a
67            return a + b
68
69        return Usage(
70            input_tokens=_add_optional_int(self.input_tokens, other.input_tokens),
71            output_tokens=_add_optional_int(self.output_tokens, other.output_tokens),
72            total_tokens=_add_optional_int(self.total_tokens, other.total_tokens),
73            cost=_add_optional_float(self.cost, other.cost),
74        )

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
input_tokens: int | None
output_tokens: int | None
total_tokens: int | None
cost: float | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

prompt_generator_values = ['simple_prompt_builder', 'multi_shot_prompt_builder', 'few_shot_prompt_builder', 'repairs_prompt_builder', 'simple_chain_of_thought_prompt_builder', 'few_shot_chain_of_thought_prompt_builder', 'multi_shot_chain_of_thought_prompt_builder', 'short_prompt_builder']