kiln_ai.datamodel

See our docs for details about our datamodel classes and hierarchy:

Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html

User docs: https://docs.kiln.tech/developers/kiln-datamodel

 1"""
 2See our docs for details about our datamodel classes and hierarchy:
 3
 4Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html
 5
 6User docs: https://docs.kiln.tech/developers/kiln-datamodel
 7"""
 8
 9# This component uses "flat" imports so we don't have too much internal structure exposed in the API.
10# for example you can just `from datamodel import Task, Project` instead of `from datamodel.task import Task; from datamodel.project import Project`
11
12from __future__ import annotations
13
14from kiln_ai.datamodel import (
15    chunk,
16    dataset_split,
17    embedding,
18    eval,
19    extraction,
20    rag,
21    reranker,
22    strict_mode,
23)
24from kiln_ai.datamodel.basemodel import generate_model_id
25from kiln_ai.datamodel.datamodel_enums import (
26    FineTuneStatusType,
27    Priority,
28    StructuredOutputMode,
29    TaskOutputRatingType,
30)
31from kiln_ai.datamodel.dataset_split import DatasetSplit, DatasetSplitDefinition
32from kiln_ai.datamodel.external_tool_server import ExternalToolServer
33from kiln_ai.datamodel.finetune import Finetune
34from kiln_ai.datamodel.project import Project
35from kiln_ai.datamodel.prompt import BasePrompt, Prompt
36from kiln_ai.datamodel.prompt_id import (
37    PromptGenerators,
38    PromptId,
39    prompt_generator_values,
40)
41from kiln_ai.datamodel.task import Task, TaskRequirement
42from kiln_ai.datamodel.task_output import (
43    DataSource,
44    DataSourceProperty,
45    DataSourceType,
46    RequirementRating,
47    TaskOutput,
48    TaskOutputRating,
49)
50from kiln_ai.datamodel.task_run import TaskRun, Usage
51
52__all__ = [
53    "BasePrompt",
54    "DataSource",
55    "DataSourceProperty",
56    "DataSourceType",
57    "DatasetSplit",
58    "DatasetSplitDefinition",
59    "ExternalToolServer",
60    "FineTuneStatusType",
61    "Finetune",
62    "Priority",
63    "Project",
64    "Prompt",
65    "PromptGenerators",
66    "PromptId",
67    "RequirementRating",
68    "StructuredOutputMode",
69    "Task",
70    "TaskOutput",
71    "TaskOutputRating",
72    "TaskOutputRatingType",
73    "TaskRequirement",
74    "TaskRun",
75    "Usage",
76    "chunk",
77    "dataset_split",
78    "embedding",
79    "eval",
80    "extraction",
81    "generate_model_id",
82    "prompt_generator_values",
83    "rag",
84    "reranker",
85    "strict_mode",
86]
class BasePrompt(pydantic.main.BaseModel):
 7class BasePrompt(BaseModel):
 8    """
 9    A prompt for a task. This is the basic data storage format which can be used throughout a project.
10
11    The "Prompt" model name is reserved for the custom prompts parented by a task.
12    """
13
14    name: FilenameString = Field(description="The name of the prompt.")
15    description: str | None = Field(
16        default=None,
17        description="A more detailed description of the prompt.",
18    )
19    generator_id: str | None = Field(
20        default=None,
21        description="The id of the generator that created this prompt.",
22    )
23    prompt: str = Field(
24        description="The prompt for the task.",
25        min_length=1,
26    )
27    chain_of_thought_instructions: str | None = Field(
28        default=None,
29        description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting. COT will not be used unless this is provided.",
30    )

A prompt for a task. This is the basic data storage format which can be used throughout a project.

The "Prompt" model name is reserved for the custom prompts parented by a task.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
description: str | None
generator_id: str | None
prompt: str
chain_of_thought_instructions: str | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataSource(pydantic.main.BaseModel):
192class DataSource(BaseModel):
193    """
194    Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.
195
196    Properties vary based on the source type - for synthetic/tool_call sources this includes
197    model information, for human sources this includes creator information, for file imports
198    this includes file information.
199    """
200
201    type: DataSourceType
202    properties: Dict[str, str | int | float] = Field(
203        default={},
204        description="Properties describing the data source. For synthetic things like model. For human: the human's name. For file_import: file information.",
205    )
206    run_config: Optional[RunConfigProperties] = Field(
207        default=None,
208        description="The run config used to generate the data, if generated by a running a model in Kiln (only true for type=synthetic).",
209    )
210
211    _data_source_properties = [
212        DataSourceProperty(
213            name="created_by",
214            type=str,
215            required_for=[DataSourceType.human],
216            not_allowed_for=[
217                DataSourceType.synthetic,
218                DataSourceType.file_import,
219                DataSourceType.tool_call,
220            ],
221        ),
222        DataSourceProperty(
223            name="model_name",
224            type=str,
225            required_for=[DataSourceType.synthetic],
226            not_allowed_for=[
227                DataSourceType.human,
228                DataSourceType.file_import,
229                DataSourceType.tool_call,
230            ],
231        ),
232        DataSourceProperty(
233            name="model_provider",
234            type=str,
235            required_for=[DataSourceType.synthetic],
236            not_allowed_for=[
237                DataSourceType.human,
238                DataSourceType.file_import,
239                DataSourceType.tool_call,
240            ],
241        ),
242        DataSourceProperty(
243            name="adapter_name",
244            type=str,
245            required_for=[DataSourceType.synthetic],
246            not_allowed_for=[
247                DataSourceType.human,
248                DataSourceType.file_import,
249                DataSourceType.tool_call,
250            ],
251        ),
252        DataSourceProperty(
253            # Legacy field -- allow loading from old runs, but we shouldn't be setting it.
254            name="prompt_builder_name",
255            type=str,
256            not_allowed_for=[
257                DataSourceType.human,
258                DataSourceType.file_import,
259                DataSourceType.tool_call,
260            ],
261        ),
262        DataSourceProperty(
263            # The PromptId of the prompt. Can be a saved prompt, fine-tune, generator name, etc. See PromptId type for more details.
264            name="prompt_id",
265            type=str,
266            not_allowed_for=[
267                DataSourceType.human,
268                DataSourceType.file_import,
269                DataSourceType.tool_call,
270            ],
271        ),
272        DataSourceProperty(
273            name="file_name",
274            type=str,
275            required_for=[DataSourceType.file_import],
276            not_allowed_for=[
277                DataSourceType.human,
278                DataSourceType.synthetic,
279                DataSourceType.tool_call,
280            ],
281        ),
282    ]
283
284    @model_validator(mode="after")
285    def validate_type(self) -> "DataSource":
286        if self.type not in DataSourceType:
287            raise ValueError(f"Invalid data source type: {self.type}")
288        return self
289
290    @model_validator(mode="after")
291    def validate_properties(self) -> "DataSource":
292        for prop in self._data_source_properties:
293            # Check the property type is correct
294            if prop.name in self.properties:
295                if not isinstance(self.properties[prop.name], prop.type):
296                    raise ValueError(
297                        f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source"
298                    )
299            # Check the property is required for the data source type
300            if self.type in prop.required_for:
301                if prop.name not in self.properties:
302                    raise ValueError(
303                        f"'{prop.name}' is required for {self.type} data source"
304                    )
305            # Check the property is not allowed for the data source type
306            elif self.type in prop.not_allowed_for and prop.name in self.properties:
307                raise ValueError(
308                    f"'{prop.name}' is not allowed for {self.type} data source"
309                )
310        return self
311
312    @model_validator(mode="after")
313    def validate_no_empty_properties(self) -> Self:
314        for prop, value in self.properties.items():
315            if isinstance(value, str) and value == "":
316                raise ValueError(
317                    f"Property '{prop}' must be a non-empty string for {self.type} data source"
318                )
319        return self

Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.

Properties vary based on the source type - for synthetic/tool_call sources this includes model information, for human sources this includes creator information, for file imports this includes file information.

properties: Dict[str, str | int | float]
run_config: Optional[kiln_ai.datamodel.run_config.RunConfigProperties]
@model_validator(mode='after')
def validate_type(self) -> DataSource:
284    @model_validator(mode="after")
285    def validate_type(self) -> "DataSource":
286        if self.type not in DataSourceType:
287            raise ValueError(f"Invalid data source type: {self.type}")
288        return self
@model_validator(mode='after')
def validate_properties(self) -> DataSource:
290    @model_validator(mode="after")
291    def validate_properties(self) -> "DataSource":
292        for prop in self._data_source_properties:
293            # Check the property type is correct
294            if prop.name in self.properties:
295                if not isinstance(self.properties[prop.name], prop.type):
296                    raise ValueError(
297                        f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source"
298                    )
299            # Check the property is required for the data source type
300            if self.type in prop.required_for:
301                if prop.name not in self.properties:
302                    raise ValueError(
303                        f"'{prop.name}' is required for {self.type} data source"
304                    )
305            # Check the property is not allowed for the data source type
306            elif self.type in prop.not_allowed_for and prop.name in self.properties:
307                raise ValueError(
308                    f"'{prop.name}' is not allowed for {self.type} data source"
309                )
310        return self
@model_validator(mode='after')
def validate_no_empty_properties(self) -> Self:
312    @model_validator(mode="after")
313    def validate_no_empty_properties(self) -> Self:
314        for prop, value in self.properties.items():
315            if isinstance(value, str) and value == "":
316                raise ValueError(
317                    f"Property '{prop}' must be a non-empty string for {self.type} data source"
318                )
319        return self
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class DataSourceProperty(pydantic.main.BaseModel):
178class DataSourceProperty(BaseModel):
179    """
180    Defines a property that can be associated with a data source.
181
182    Includes validation rules for when properties are required or not allowed
183    based on the data source type.
184    """
185
186    name: str
187    type: Type[Union[str, int, float]]
188    required_for: List[DataSourceType] = []
189    not_allowed_for: List[DataSourceType] = []

Defines a property that can be associated with a data source.

Includes validation rules for when properties are required or not allowed based on the data source type.

name: str
type: Type[Union[str, int, float]]
required_for: List[DataSourceType]
not_allowed_for: List[DataSourceType]
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class DataSourceType(builtins.str, enum.Enum):
164class DataSourceType(str, Enum):
165    """
166    The source type of a piece of data.
167
168    Human: a human created the data
169    Synthetic: a model created the data
170    """
171
172    human = "human"
173    synthetic = "synthetic"
174    file_import = "file_import"
175    tool_call = "tool_call"

The source type of a piece of data.

Human: a human created the data Synthetic: a model created the data

human = <DataSourceType.human: 'human'>
synthetic = <DataSourceType.synthetic: 'synthetic'>
file_import = <DataSourceType.file_import: 'file_import'>
tool_call = <DataSourceType.tool_call: 'tool_call'>
class DatasetSplit(kiln_ai.datamodel.basemodel.KilnParentedModel):
 67class DatasetSplit(KilnParentedModel):
 68    """
 69    A collection of task runs, with optional splits (train, test, validation).
 70
 71    Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.
 72
 73    Maintains a list of IDs for each split, to avoid data duplication.
 74    """
 75
 76    name: FilenameString = Field(description="The name of the dataset split.")
 77    description: str | None = Field(
 78        default=None,
 79        description="A description of the dataset for you and your team. Not used in training.",
 80    )
 81    splits: list[DatasetSplitDefinition] = Field(
 82        default_factory=list,
 83        description="The splits in the dataset.",
 84    )
 85    split_contents: dict[str, list[str]] = Field(
 86        description="The contents of each split in the dataset. The key is the split name, and the value is a list of task run IDs.",
 87    )
 88    filter: DatasetFilterId | None = Field(
 89        default=None,
 90        description="The filter used to build the dataset.",
 91    )
 92
 93    @model_validator(mode="after")
 94    def validate_split_percentages(self) -> "DatasetSplit":
 95        total = sum(split.percentage for split in self.splits)
 96        if not math.isclose(total, 1.0, rel_tol=1e-9):
 97            raise ValueError(f"The sum of split percentages must be 1.0 (got {total})")
 98        return self
 99
100    @classmethod
101    def from_task(
102        cls,
103        name: str,
104        task: "Task",
105        splits: list[DatasetSplitDefinition],
106        filter_id: DatasetFilterId = "all",
107        description: str | None = None,
108    ):
109        """
110        Build a dataset split from a task.
111        """
112        filter = dataset_filter_from_id(filter_id)
113        split_contents = cls.build_split_contents(task, splits, filter)
114        return cls(
115            parent=task,
116            name=name,
117            description=description,
118            splits=splits,
119            split_contents=split_contents,
120            filter=filter_id,
121        )
122
123    @classmethod
124    def build_split_contents(
125        cls,
126        task: "Task",
127        splits: list[DatasetSplitDefinition],
128        filter: DatasetFilter,
129    ) -> dict[str, list[str]]:
130        valid_ids = []
131        for task_run in task.runs():
132            if filter(task_run):
133                valid_ids.append(task_run.id)
134
135        # Shuffle and split by split percentage
136        random.shuffle(valid_ids)
137        split_contents = {}
138        start_idx = 0
139        remaining_items = len(valid_ids)
140
141        # Handle all splits except the last one
142        for split in splits[:-1]:
143            split_size = round(len(valid_ids) * split.percentage)
144            split_contents[split.name] = valid_ids[start_idx : start_idx + split_size]
145            start_idx += split_size
146            remaining_items -= split_size
147
148        # Last split gets all remaining items (for rounding)
149        if splits:
150            split_contents[splits[-1].name] = valid_ids[start_idx:]
151
152        return split_contents
153
154    def parent_task(self) -> "Task | None":
155        # inline import to avoid circular import
156        from kiln_ai.datamodel import Task
157
158        if not isinstance(self.parent, Task):
159            return None
160        return self.parent
161
162    def missing_count(self) -> int:
163        """
164        Returns:
165            int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
166        """
167        parent = self.parent_task()
168        if parent is None:
169            raise ValueError("DatasetSplit has no parent task")
170
171        runs = parent.runs(readonly=True)
172        all_ids = set(run.id for run in runs)
173        all_ids_in_splits = set()
174        for ids in self.split_contents.values():
175            all_ids_in_splits.update(ids)
176        missing = all_ids_in_splits - all_ids
177        return len(missing)

A collection of task runs, with optional splits (train, test, validation).

Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.

Maintains a list of IDs for each split, to avoid data duplication.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
description: str | None
splits: list[DatasetSplitDefinition]
split_contents: dict[str, list[str]]
filter: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7f2f1dc63c40>)]]
@model_validator(mode='after')
def validate_split_percentages(self) -> DatasetSplit:
93    @model_validator(mode="after")
94    def validate_split_percentages(self) -> "DatasetSplit":
95        total = sum(split.percentage for split in self.splits)
96        if not math.isclose(total, 1.0, rel_tol=1e-9):
97            raise ValueError(f"The sum of split percentages must be 1.0 (got {total})")
98        return self
@classmethod
def from_task( cls, name: str, task: Task, splits: list[DatasetSplitDefinition], filter_id: Annotated[str, AfterValidator(func=<function <lambda>>)] = 'all', description: str | None = None):
100    @classmethod
101    def from_task(
102        cls,
103        name: str,
104        task: "Task",
105        splits: list[DatasetSplitDefinition],
106        filter_id: DatasetFilterId = "all",
107        description: str | None = None,
108    ):
109        """
110        Build a dataset split from a task.
111        """
112        filter = dataset_filter_from_id(filter_id)
113        split_contents = cls.build_split_contents(task, splits, filter)
114        return cls(
115            parent=task,
116            name=name,
117            description=description,
118            splits=splits,
119            split_contents=split_contents,
120            filter=filter_id,
121        )

Build a dataset split from a task.

@classmethod
def build_split_contents( cls, task: Task, splits: list[DatasetSplitDefinition], filter: kiln_ai.datamodel.dataset_filters.DatasetFilter) -> dict[str, list[str]]:
123    @classmethod
124    def build_split_contents(
125        cls,
126        task: "Task",
127        splits: list[DatasetSplitDefinition],
128        filter: DatasetFilter,
129    ) -> dict[str, list[str]]:
130        valid_ids = []
131        for task_run in task.runs():
132            if filter(task_run):
133                valid_ids.append(task_run.id)
134
135        # Shuffle and split by split percentage
136        random.shuffle(valid_ids)
137        split_contents = {}
138        start_idx = 0
139        remaining_items = len(valid_ids)
140
141        # Handle all splits except the last one
142        for split in splits[:-1]:
143            split_size = round(len(valid_ids) * split.percentage)
144            split_contents[split.name] = valid_ids[start_idx : start_idx + split_size]
145            start_idx += split_size
146            remaining_items -= split_size
147
148        # Last split gets all remaining items (for rounding)
149        if splits:
150            split_contents[splits[-1].name] = valid_ids[start_idx:]
151
152        return split_contents
def parent_task(self) -> Task | None:
154    def parent_task(self) -> "Task | None":
155        # inline import to avoid circular import
156        from kiln_ai.datamodel import Task
157
158        if not isinstance(self.parent, Task):
159            return None
160        return self.parent
def missing_count(self) -> int:
162    def missing_count(self) -> int:
163        """
164        Returns:
165            int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
166        """
167        parent = self.parent_task()
168        if parent is None:
169            raise ValueError("DatasetSplit has no parent task")
170
171        runs = parent.runs(readonly=True)
172        all_ids = set(run.id for run in runs)
173        all_ids_in_splits = set()
174        for ids in self.split_contents.values():
175            all_ids_in_splits.update(ids)
176        missing = all_ids_in_splits - all_ids
177        return len(missing)

Returns: int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset

def relationship_name() -> str:
713        def relationship_name_method() -> str:
714            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
706        def parent_class_method() -> Type[KilnParentModel]:
707            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class DatasetSplitDefinition(pydantic.main.BaseModel):
23class DatasetSplitDefinition(BaseModel):
24    """
25    A definition of a split in a dataset.
26
27    Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)
28    """
29
30    name: FilenameString = Field(
31        description="The name of the dataset split definition."
32    )
33    description: str | None = Field(
34        default=None,
35        description="A description of the dataset for you and your team. Not used in training.",
36    )
37    percentage: float = Field(
38        ge=0.0,
39        le=1.0,
40        description="The percentage of the dataset that this split represents (between 0 and 1).",
41    )

A definition of a split in a dataset.

Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
description: str | None
percentage: float
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class ExternalToolServer(kiln_ai.datamodel.basemodel.KilnParentedModel):
 52class ExternalToolServer(KilnParentedModel):
 53    """
 54    Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.
 55
 56    This model stores the necessary configuration to connect to and authenticate with
 57    external MCP servers that provide tools for LLM interactions.
 58    """
 59
 60    name: FilenameString = Field(description="The name of the external tool.")
 61    type: ToolServerType = Field(
 62        description="The type of external tool server. Remote tools are hosted on a remote server",
 63    )
 64    description: str | None = Field(
 65        default=None,
 66        description="A description of the external tool for you and your team. Will not be used in prompts/training/validation.",
 67    )
 68
 69    properties: (
 70        LocalServerProperties | RemoteServerProperties | KilnTaskServerProperties
 71    ) = Field(
 72        description="Configuration properties specific to the tool type.",
 73    )
 74
 75    # Private variable to store unsaved secrets
 76    _unsaved_secrets: dict[str, str] = PrivateAttr(default_factory=dict)
 77
 78    def model_post_init(self, __context: Any) -> None:
 79        # Process secrets after initialization (pydantic v2 hook)
 80        self._process_secrets_from_properties()
 81
 82    def _process_secrets_from_properties(self) -> None:
 83        """
 84        Extract secrets from properties and move them to _unsaved_secrets.
 85        This removes secrets from the properties dict so they aren't saved to file.
 86        Clears existing _unsaved_secrets first to handle property updates correctly.
 87        """
 88        # Clear existing unsaved secrets since we're reprocessing
 89        self._unsaved_secrets.clear()
 90
 91        secret_keys = self.get_secret_keys()
 92
 93        if not secret_keys:
 94            return
 95
 96        # Extract secret values from properties based on server type
 97        match self.type:
 98            case ToolServerType.remote_mcp:
 99                headers = self.properties.get("headers", {})
100                for key_name in secret_keys:
101                    if key_name in headers:
102                        self._unsaved_secrets[key_name] = headers[key_name]
103                        # Remove from headers immediately so they are not saved to file
104                        del headers[key_name]
105
106            case ToolServerType.local_mcp:
107                env_vars = self.properties.get("env_vars", {})
108                for key_name in secret_keys:
109                    if key_name in env_vars:
110                        self._unsaved_secrets[key_name] = env_vars[key_name]
111                        # Remove from env_vars immediately so they are not saved to file
112                        del env_vars[key_name]
113
114            case ToolServerType.kiln_task:
115                pass
116
117            case _:
118                raise_exhaustive_enum_error(self.type)
119
120    def __setattr__(self, name: str, value: Any) -> None:
121        """
122        Override __setattr__ to process secrets whenever properties are updated.
123        """
124        super().__setattr__(name, value)
125
126        # Process secrets whenever properties are updated
127        if name == "properties":
128            self._process_secrets_from_properties()
129
130    # Validation Helpers
131
132    @classmethod
133    def check_server_url(cls, server_url: str) -> None:
134        """Validate Server URL"""
135        if not isinstance(server_url, str):
136            raise ValueError("Server URL must be a string")
137
138        # Check for leading whitespace in URL
139        if server_url != server_url.lstrip():
140            raise ValueError("Server URL must not have leading whitespace")
141
142        parsed_url = urlparse(server_url)
143        if not parsed_url.netloc:
144            raise ValueError("Server URL is not a valid URL")
145        if parsed_url.scheme not in ["http", "https"]:
146            raise ValueError("Server URL must start with http:// or https://")
147
148    @classmethod
149    def check_headers(cls, headers: dict) -> None:
150        """Validate Headers"""
151        if not isinstance(headers, dict):
152            raise ValueError("headers must be a dictionary")
153
154        for key, value in headers.items():
155            if not key:
156                raise ValueError("Header name is required")
157            if not value:
158                raise ValueError("Header value is required")
159
160            # Reject invalid header names and CR/LF in names/values
161            token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$")
162            if not token_re.match(key):
163                raise ValueError(f'Invalid header name: "{key}"')
164            if re.search(r"\r|\n", key) or re.search(r"\r|\n", value):
165                raise ValueError(
166                    "Header names/values must not contain invalid characters"
167                )
168
169    @classmethod
170    def check_secret_keys(
171        cls, secret_keys: list, key_type: str, tool_type: str
172    ) -> None:
173        """Validate Secret Keys (generic method for both header and env var keys)"""
174        if not isinstance(secret_keys, list):
175            raise ValueError(
176                f"{key_type} must be a list for external tools of type '{tool_type}'"
177            )
178        if not all(isinstance(k, str) for k in secret_keys):
179            raise ValueError(f"{key_type} must contain only strings")
180        if not all(key for key in secret_keys):
181            raise ValueError("Secret key is required")
182
183    @classmethod
184    def check_env_vars(cls, env_vars: dict) -> None:
185        """Validate Environment Variables"""
186        if not isinstance(env_vars, dict):
187            raise ValueError("environment variables must be a dictionary")
188
189        # Validate env_vars keys are in the correct format for Environment Variables
190        # According to POSIX specification, environment variable names must:
191        # - Start with a letter (a-z, A-Z) or underscore (_)
192        # - Contain only ASCII letters, digits, and underscores
193        for key, _ in env_vars.items():
194            if not key or not (
195                key[0].isascii() and (key[0].isalpha() or key[0] == "_")
196            ):
197                raise ValueError(
198                    f"Invalid environment variable key: {key}. Must start with a letter or underscore."
199                )
200
201            if not all(c.isascii() and (c.isalnum() or c == "_") for c in key):
202                raise ValueError(
203                    f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores."
204                )
205
206    @classmethod
207    def type_from_data(cls, data: dict) -> ToolServerType:
208        """Get the tool server type from the data for the the validators"""
209        raw_type = data.get("type")
210        if raw_type is None:
211            raise ValueError("type is required")
212        try:
213            return ToolServerType(raw_type)
214        except ValueError:
215            valid_types = ", ".join(type.value for type in ToolServerType)
216            raise ValueError(f"type must be one of: {valid_types}")
217
218    @model_validator(mode="before")
219    def upgrade_old_properties(cls, data: dict) -> dict:
220        """
221        Upgrade properties for backwards compatibility.
222        """
223        properties = data.get("properties")
224        if properties is not None and "is_archived" not in properties:
225            # Add is_archived field with default value back to data
226            properties["is_archived"] = False
227            data["properties"] = properties
228        return data
229
230    @model_validator(mode="before")
231    def validate_required_fields(cls, data: dict) -> dict:
232        """Validate that each tool type has the required configuration."""
233        server_type = ExternalToolServer.type_from_data(data)
234        properties = data.get("properties", {})
235
236        match server_type:
237            case ToolServerType.remote_mcp:
238                server_url = properties.get("server_url", None)
239                if server_url is None:
240                    raise ValueError(
241                        "Server URL is required to connect to a remote MCP server"
242                    )
243                ExternalToolServer.check_server_url(server_url)
244
245            case ToolServerType.local_mcp:
246                command = properties.get("command", None)
247                if command is None:
248                    raise ValueError("command is required to start a local MCP server")
249                if not isinstance(command, str):
250                    raise ValueError(
251                        "command must be a string to start a local MCP server"
252                    )
253                # Reject empty/whitespace-only command strings
254                if command.strip() == "":
255                    raise ValueError("command must be a non-empty string")
256
257                args = properties.get("args", None)
258                if args is not None:
259                    if not isinstance(args, list):
260                        raise ValueError(
261                            "arguments must be a list to start a local MCP server"
262                        )
263
264            case ToolServerType.kiln_task:
265                tool_name_validator(properties.get("name", ""))
266                err_msg_prefix = "Kiln task server properties:"
267                validate_return_dict_prop(
268                    properties, "description", str, err_msg_prefix
269                )
270                description = properties.get("description", "")
271                if len(description) > 128:
272                    raise ValueError("description must be 128 characters or less")
273                validate_return_dict_prop(
274                    properties, "is_archived", bool, err_msg_prefix
275                )
276                validate_return_dict_prop(properties, "task_id", str, err_msg_prefix)
277                validate_return_dict_prop(
278                    properties, "run_config_id", str, err_msg_prefix
279                )
280
281            case _:
282                # Type checking will catch missing cases
283                raise_exhaustive_enum_error(server_type)
284        return data
285
286    @model_validator(mode="before")
287    def validate_headers_and_env_vars(cls, data: dict) -> dict:
288        """
289        Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
290        """
291        type = ExternalToolServer.type_from_data(data)
292
293        properties = data.get("properties", {})
294        if properties is None:
295            raise ValueError("properties is required")
296
297        match type:
298            case ToolServerType.remote_mcp:
299                # Validate headers
300                headers = properties.get("headers", None)
301                if headers is not None:
302                    ExternalToolServer.check_headers(headers)
303
304                # Secret header keys are optional, validate if they are set
305                secret_header_keys = properties.get("secret_header_keys", None)
306                if secret_header_keys is not None:
307                    ExternalToolServer.check_secret_keys(
308                        secret_header_keys, "secret_header_keys", "remote_mcp"
309                    )
310
311            case ToolServerType.local_mcp:
312                # Validate secret environment variable keys
313                env_vars = properties.get("env_vars", {})
314                if env_vars is not None:
315                    ExternalToolServer.check_env_vars(env_vars)
316
317                # Secret env var keys are optional, but if they are set, they must be a list of strings
318                secret_env_var_keys = properties.get("secret_env_var_keys", None)
319                if secret_env_var_keys is not None:
320                    ExternalToolServer.check_secret_keys(
321                        secret_env_var_keys, "secret_env_var_keys", "local_mcp"
322                    )
323
324            case ToolServerType.kiln_task:
325                pass
326
327            case _:
328                raise_exhaustive_enum_error(type)
329
330        return data
331
332    def get_secret_keys(self) -> list[str]:
333        """
334        Get the list of secret key names based on server type.
335
336        Returns:
337            List of secret key names (header names for remote, env var names for local)
338        """
339        match self.type:
340            case ToolServerType.remote_mcp:
341                return self.properties.get("secret_header_keys", [])
342            case ToolServerType.local_mcp:
343                return self.properties.get("secret_env_var_keys", [])
344            case ToolServerType.kiln_task:
345                return []
346            case _:
347                raise_exhaustive_enum_error(self.type)
348
349    def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
350        """
351        Retrieve secrets from configuration system or in-memory storage.
352        Automatically determines which secret keys to retrieve based on the server type.
353        Config secrets take precedence over unsaved secrets.
354
355        Returns:
356            Tuple of (secrets_dict, missing_secrets_list) where:
357            - secrets_dict: Dictionary mapping key names to their secret values
358            - missing_secrets_list: List of secret key names that are missing values
359        """
360        secrets = {}
361        missing_secrets = []
362        secret_keys = self.get_secret_keys()
363
364        if secret_keys and len(secret_keys) > 0:
365            config = Config.shared()
366            mcp_secrets = config.get_value(MCP_SECRETS_KEY)
367
368            for key_name in secret_keys:
369                secret_value = None
370
371                # First check config secrets (persistent storage), key is mcp_server_id::key_name
372                secret_key = self._config_secret_key(key_name)
373                secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None
374
375                # Fall back to unsaved secrets (in-memory storage)
376                if (
377                    not secret_value
378                    and hasattr(self, "_unsaved_secrets")
379                    and key_name in self._unsaved_secrets
380                ):
381                    secret_value = self._unsaved_secrets[key_name]
382
383                if secret_value:
384                    secrets[key_name] = secret_value
385                else:
386                    missing_secrets.append(key_name)
387
388        return secrets, missing_secrets
389
390    def _save_secrets(self) -> None:
391        """
392        Save unsaved secrets to the configuration system.
393        """
394        secret_keys = self.get_secret_keys()
395
396        # No secrets to save
397        if not secret_keys:
398            return
399
400        if self.id is None:
401            raise ValueError("Server ID cannot be None when saving secrets")
402
403        # Check if secrets are already saved
404        if not hasattr(self, "_unsaved_secrets") or not self._unsaved_secrets:
405            return
406
407        config = Config.shared()
408        mcp_secrets: dict[str, str] = config.get_value(MCP_SECRETS_KEY) or {}
409
410        # Store secrets with the pattern: mcp_server_id::key_name
411        for key_name, secret_value in self._unsaved_secrets.items():
412            secret_key = self._config_secret_key(key_name)
413            mcp_secrets[secret_key] = secret_value
414
415        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
416
417        # Clear unsaved secrets after saving
418        self._unsaved_secrets.clear()
419
420    def delete_secrets(self) -> None:
421        """
422        Delete all secrets for this tool server from the configuration system.
423        """
424        secret_keys = self.get_secret_keys()
425
426        config = Config.shared()
427        mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]()
428
429        # Remove secrets with the pattern: mcp_server_id::key_name
430        for key_name in secret_keys:
431            secret_key = self._config_secret_key(key_name)
432            if secret_key in mcp_secrets:
433                del mcp_secrets[secret_key]
434
435        # Always call update_settings to maintain consistency with the old behavior
436        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
437
438    def save_to_file(self) -> None:
439        """
440        Override save_to_file to automatically save any unsaved secrets before saving to file.
441
442        This ensures that secrets are always saved when the object is saved,
443        preventing the issue where secrets could be lost if save_to_file is called
444        without explicitly saving secrets first.
445        """
446        # Save any unsaved secrets first
447        if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets:
448            self._save_secrets()
449
450        # Call the parent save_to_file method
451        super().save_to_file()
452
453    #  Internal helpers
454
455    def _config_secret_key(self, key_name: str) -> str:
456        """
457        Generate the secret key pattern for storing/retrieving secrets.
458
459        Args:
460            key_name: The name of the secret key
461
462        Returns:
463            The formatted secret key: "{server_id}::{key_name}"
464        """
465        return f"{self.id}::{key_name}"

Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.

This model stores the necessary configuration to connect to and authenticate with external MCP servers that provide tools for LLM interactions.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
type: kiln_ai.datamodel.external_tool_server.ToolServerType
description: str | None
properties: kiln_ai.datamodel.external_tool_server.LocalServerProperties | kiln_ai.datamodel.external_tool_server.RemoteServerProperties | kiln_ai.datamodel.external_tool_server.KilnTaskServerProperties
def model_post_init(self, _ExternalToolServer__context: Any) -> None:
78    def model_post_init(self, __context: Any) -> None:
79        # Process secrets after initialization (pydantic v2 hook)
80        self._process_secrets_from_properties()

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

@classmethod
def check_server_url(cls, server_url: str) -> None:
132    @classmethod
133    def check_server_url(cls, server_url: str) -> None:
134        """Validate Server URL"""
135        if not isinstance(server_url, str):
136            raise ValueError("Server URL must be a string")
137
138        # Check for leading whitespace in URL
139        if server_url != server_url.lstrip():
140            raise ValueError("Server URL must not have leading whitespace")
141
142        parsed_url = urlparse(server_url)
143        if not parsed_url.netloc:
144            raise ValueError("Server URL is not a valid URL")
145        if parsed_url.scheme not in ["http", "https"]:
146            raise ValueError("Server URL must start with http:// or https://")

Validate Server URL

@classmethod
def check_headers(cls, headers: dict) -> None:
148    @classmethod
149    def check_headers(cls, headers: dict) -> None:
150        """Validate Headers"""
151        if not isinstance(headers, dict):
152            raise ValueError("headers must be a dictionary")
153
154        for key, value in headers.items():
155            if not key:
156                raise ValueError("Header name is required")
157            if not value:
158                raise ValueError("Header value is required")
159
160            # Reject invalid header names and CR/LF in names/values
161            token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$")
162            if not token_re.match(key):
163                raise ValueError(f'Invalid header name: "{key}"')
164            if re.search(r"\r|\n", key) or re.search(r"\r|\n", value):
165                raise ValueError(
166                    "Header names/values must not contain invalid characters"
167                )

Validate Headers

@classmethod
def check_secret_keys(cls, secret_keys: list, key_type: str, tool_type: str) -> None:
169    @classmethod
170    def check_secret_keys(
171        cls, secret_keys: list, key_type: str, tool_type: str
172    ) -> None:
173        """Validate Secret Keys (generic method for both header and env var keys)"""
174        if not isinstance(secret_keys, list):
175            raise ValueError(
176                f"{key_type} must be a list for external tools of type '{tool_type}'"
177            )
178        if not all(isinstance(k, str) for k in secret_keys):
179            raise ValueError(f"{key_type} must contain only strings")
180        if not all(key for key in secret_keys):
181            raise ValueError("Secret key is required")

Validate Secret Keys (generic method for both header and env var keys)

@classmethod
def check_env_vars(cls, env_vars: dict) -> None:
183    @classmethod
184    def check_env_vars(cls, env_vars: dict) -> None:
185        """Validate Environment Variables"""
186        if not isinstance(env_vars, dict):
187            raise ValueError("environment variables must be a dictionary")
188
189        # Validate env_vars keys are in the correct format for Environment Variables
190        # According to POSIX specification, environment variable names must:
191        # - Start with a letter (a-z, A-Z) or underscore (_)
192        # - Contain only ASCII letters, digits, and underscores
193        for key, _ in env_vars.items():
194            if not key or not (
195                key[0].isascii() and (key[0].isalpha() or key[0] == "_")
196            ):
197                raise ValueError(
198                    f"Invalid environment variable key: {key}. Must start with a letter or underscore."
199                )
200
201            if not all(c.isascii() and (c.isalnum() or c == "_") for c in key):
202                raise ValueError(
203                    f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores."
204                )

Validate Environment Variables

@classmethod
def type_from_data(cls, data: dict) -> kiln_ai.datamodel.external_tool_server.ToolServerType:
206    @classmethod
207    def type_from_data(cls, data: dict) -> ToolServerType:
208        """Get the tool server type from the data for the the validators"""
209        raw_type = data.get("type")
210        if raw_type is None:
211            raise ValueError("type is required")
212        try:
213            return ToolServerType(raw_type)
214        except ValueError:
215            valid_types = ", ".join(type.value for type in ToolServerType)
216            raise ValueError(f"type must be one of: {valid_types}")

Get the tool server type from the data for the the validators

@model_validator(mode='before')
def upgrade_old_properties(cls, data: dict) -> dict:
218    @model_validator(mode="before")
219    def upgrade_old_properties(cls, data: dict) -> dict:
220        """
221        Upgrade properties for backwards compatibility.
222        """
223        properties = data.get("properties")
224        if properties is not None and "is_archived" not in properties:
225            # Add is_archived field with default value back to data
226            properties["is_archived"] = False
227            data["properties"] = properties
228        return data

Upgrade properties for backwards compatibility.

@model_validator(mode='before')
def validate_required_fields(cls, data: dict) -> dict:
230    @model_validator(mode="before")
231    def validate_required_fields(cls, data: dict) -> dict:
232        """Validate that each tool type has the required configuration."""
233        server_type = ExternalToolServer.type_from_data(data)
234        properties = data.get("properties", {})
235
236        match server_type:
237            case ToolServerType.remote_mcp:
238                server_url = properties.get("server_url", None)
239                if server_url is None:
240                    raise ValueError(
241                        "Server URL is required to connect to a remote MCP server"
242                    )
243                ExternalToolServer.check_server_url(server_url)
244
245            case ToolServerType.local_mcp:
246                command = properties.get("command", None)
247                if command is None:
248                    raise ValueError("command is required to start a local MCP server")
249                if not isinstance(command, str):
250                    raise ValueError(
251                        "command must be a string to start a local MCP server"
252                    )
253                # Reject empty/whitespace-only command strings
254                if command.strip() == "":
255                    raise ValueError("command must be a non-empty string")
256
257                args = properties.get("args", None)
258                if args is not None:
259                    if not isinstance(args, list):
260                        raise ValueError(
261                            "arguments must be a list to start a local MCP server"
262                        )
263
264            case ToolServerType.kiln_task:
265                tool_name_validator(properties.get("name", ""))
266                err_msg_prefix = "Kiln task server properties:"
267                validate_return_dict_prop(
268                    properties, "description", str, err_msg_prefix
269                )
270                description = properties.get("description", "")
271                if len(description) > 128:
272                    raise ValueError("description must be 128 characters or less")
273                validate_return_dict_prop(
274                    properties, "is_archived", bool, err_msg_prefix
275                )
276                validate_return_dict_prop(properties, "task_id", str, err_msg_prefix)
277                validate_return_dict_prop(
278                    properties, "run_config_id", str, err_msg_prefix
279                )
280
281            case _:
282                # Type checking will catch missing cases
283                raise_exhaustive_enum_error(server_type)
284        return data

Validate that each tool type has the required configuration.

@model_validator(mode='before')
def validate_headers_and_env_vars(cls, data: dict) -> dict:
286    @model_validator(mode="before")
287    def validate_headers_and_env_vars(cls, data: dict) -> dict:
288        """
289        Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
290        """
291        type = ExternalToolServer.type_from_data(data)
292
293        properties = data.get("properties", {})
294        if properties is None:
295            raise ValueError("properties is required")
296
297        match type:
298            case ToolServerType.remote_mcp:
299                # Validate headers
300                headers = properties.get("headers", None)
301                if headers is not None:
302                    ExternalToolServer.check_headers(headers)
303
304                # Secret header keys are optional, validate if they are set
305                secret_header_keys = properties.get("secret_header_keys", None)
306                if secret_header_keys is not None:
307                    ExternalToolServer.check_secret_keys(
308                        secret_header_keys, "secret_header_keys", "remote_mcp"
309                    )
310
311            case ToolServerType.local_mcp:
312                # Validate secret environment variable keys
313                env_vars = properties.get("env_vars", {})
314                if env_vars is not None:
315                    ExternalToolServer.check_env_vars(env_vars)
316
317                # Secret env var keys are optional, but if they are set, they must be a list of strings
318                secret_env_var_keys = properties.get("secret_env_var_keys", None)
319                if secret_env_var_keys is not None:
320                    ExternalToolServer.check_secret_keys(
321                        secret_env_var_keys, "secret_env_var_keys", "local_mcp"
322                    )
323
324            case ToolServerType.kiln_task:
325                pass
326
327            case _:
328                raise_exhaustive_enum_error(type)
329
330        return data

Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped

def get_secret_keys(self) -> list[str]:
332    def get_secret_keys(self) -> list[str]:
333        """
334        Get the list of secret key names based on server type.
335
336        Returns:
337            List of secret key names (header names for remote, env var names for local)
338        """
339        match self.type:
340            case ToolServerType.remote_mcp:
341                return self.properties.get("secret_header_keys", [])
342            case ToolServerType.local_mcp:
343                return self.properties.get("secret_env_var_keys", [])
344            case ToolServerType.kiln_task:
345                return []
346            case _:
347                raise_exhaustive_enum_error(self.type)

Get the list of secret key names based on server type.

Returns: List of secret key names (header names for remote, env var names for local)

def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
349    def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]:
350        """
351        Retrieve secrets from configuration system or in-memory storage.
352        Automatically determines which secret keys to retrieve based on the server type.
353        Config secrets take precedence over unsaved secrets.
354
355        Returns:
356            Tuple of (secrets_dict, missing_secrets_list) where:
357            - secrets_dict: Dictionary mapping key names to their secret values
358            - missing_secrets_list: List of secret key names that are missing values
359        """
360        secrets = {}
361        missing_secrets = []
362        secret_keys = self.get_secret_keys()
363
364        if secret_keys and len(secret_keys) > 0:
365            config = Config.shared()
366            mcp_secrets = config.get_value(MCP_SECRETS_KEY)
367
368            for key_name in secret_keys:
369                secret_value = None
370
371                # First check config secrets (persistent storage), key is mcp_server_id::key_name
372                secret_key = self._config_secret_key(key_name)
373                secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None
374
375                # Fall back to unsaved secrets (in-memory storage)
376                if (
377                    not secret_value
378                    and hasattr(self, "_unsaved_secrets")
379                    and key_name in self._unsaved_secrets
380                ):
381                    secret_value = self._unsaved_secrets[key_name]
382
383                if secret_value:
384                    secrets[key_name] = secret_value
385                else:
386                    missing_secrets.append(key_name)
387
388        return secrets, missing_secrets

Retrieve secrets from configuration system or in-memory storage. Automatically determines which secret keys to retrieve based on the server type. Config secrets take precedence over unsaved secrets.

Returns: Tuple of (secrets_dict, missing_secrets_list) where: - secrets_dict: Dictionary mapping key names to their secret values - missing_secrets_list: List of secret key names that are missing values

def delete_secrets(self) -> None:
420    def delete_secrets(self) -> None:
421        """
422        Delete all secrets for this tool server from the configuration system.
423        """
424        secret_keys = self.get_secret_keys()
425
426        config = Config.shared()
427        mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]()
428
429        # Remove secrets with the pattern: mcp_server_id::key_name
430        for key_name in secret_keys:
431            secret_key = self._config_secret_key(key_name)
432            if secret_key in mcp_secrets:
433                del mcp_secrets[secret_key]
434
435        # Always call update_settings to maintain consistency with the old behavior
436        config.update_settings({MCP_SECRETS_KEY: mcp_secrets})

Delete all secrets for this tool server from the configuration system.

def save_to_file(self) -> None:
438    def save_to_file(self) -> None:
439        """
440        Override save_to_file to automatically save any unsaved secrets before saving to file.
441
442        This ensures that secrets are always saved when the object is saved,
443        preventing the issue where secrets could be lost if save_to_file is called
444        without explicitly saving secrets first.
445        """
446        # Save any unsaved secrets first
447        if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets:
448            self._save_secrets()
449
450        # Call the parent save_to_file method
451        super().save_to_file()

Override save_to_file to automatically save any unsaved secrets before saving to file.

This ensures that secrets are always saved when the object is saved, preventing the issue where secrets could be lost if save_to_file is called without explicitly saving secrets first.

def relationship_name() -> str:
713        def relationship_name_method() -> str:
714            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
706        def parent_class_method() -> Type[KilnParentModel]:
707            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class FineTuneStatusType(builtins.str, enum.Enum):
53class FineTuneStatusType(str, Enum):
54    """
55    The status type of a fine-tune (running, completed, failed, etc).
56    """
57
58    unknown = "unknown"  # server error
59    pending = "pending"
60    running = "running"
61    completed = "completed"
62    failed = "failed"

The status type of a fine-tune (running, completed, failed, etc).

unknown = <FineTuneStatusType.unknown: 'unknown'>
pending = <FineTuneStatusType.pending: 'pending'>
running = <FineTuneStatusType.running: 'running'>
completed = <FineTuneStatusType.completed: 'completed'>
failed = <FineTuneStatusType.failed: 'failed'>
class Finetune(kiln_ai.datamodel.basemodel.KilnParentedModel):
 23class Finetune(KilnParentedModel):
 24    """
 25    The Kiln fine-tune datamodel.
 26
 27    Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.
 28    """
 29
 30    name: FilenameString = Field(description="The name of the fine-tune.")
 31    description: str | None = Field(
 32        default=None,
 33        description="A description of the fine-tune for you and your team. Not used in training.",
 34    )
 35    structured_output_mode: StructuredOutputMode | None = Field(
 36        default=None,
 37        description="The mode to use to train the model for structured output, if it was trained with structured output. Will determine how we call the tuned model, so we call with the matching mode.",
 38    )
 39    provider: str = Field(
 40        description="The provider to use for the fine-tune (e.g. 'openai')."
 41    )
 42    base_model_id: str = Field(
 43        description="The id of the base model to use for the fine-tune. This string relates to the provider's IDs for their own models, not Kiln IDs."
 44    )
 45    provider_id: str | None = Field(
 46        default=None,
 47        description="The ID of the fine-tune job on the provider's side. May not be the same as the fine_tune_model_id.",
 48    )
 49    fine_tune_model_id: str | None = Field(
 50        default=None,
 51        description="The ID of the fine-tuned model on the provider's side. May not be the same as the provider_id.",
 52    )
 53    dataset_split_id: str = Field(
 54        description="The ID of the dataset split to use for this fine-tune.",
 55    )
 56    train_split_name: str = Field(
 57        default="train",
 58        description="The name of the training split to use for this fine-tune.",
 59    )
 60    validation_split_name: str | None = Field(
 61        default=None,
 62        description="The name of the validation split to use for this fine-tune. Optional.",
 63    )
 64    parameters: dict[str, str | int | float | bool] = Field(
 65        default={},
 66        description="The parameters to use for this fine-tune. These are provider-specific.",
 67    )
 68    # These two fields are saved exactly used for training. Even if they map exactly to a custom prompt or generator, those can change, so we want to keep a record of the training prompt.
 69    system_message: str = Field(
 70        description="The system message to use for this fine-tune.",
 71    )
 72    thinking_instructions: str | None = Field(
 73        default=None,
 74        description="The thinking instructions to use for this fine-tune. Only used when data_strategy is final_and_intermediate.",
 75    )
 76    latest_status: FineTuneStatusType = Field(
 77        default=FineTuneStatusType.unknown,
 78        description="The latest known status of this fine-tune. Not updated in real time.",
 79    )
 80    properties: Dict[str, str | int | float] = Field(
 81        default={},
 82        description="Properties of the fine-tune. Different providers may use different properties.",
 83    )
 84    data_strategy: ChatStrategy = Field(
 85        default=ChatStrategy.single_turn,
 86        description="The strategy to use for training the model. 'final_only' will only train on the final response. 'final_and_intermediate' will train on the final response and intermediate outputs (chain of thought or reasoning).",
 87    )
 88
 89    # Workaround to return typed parent without importing Task
 90    def parent_task(self) -> Union["Task", None]:
 91        if self.parent is None or self.parent.__class__.__name__ != "Task":
 92            return None
 93        return self.parent  # type: ignore
 94
 95    @model_validator(mode="after")
 96    def validate_thinking_instructions(self) -> Self:
 97        if (
 98            self.thinking_instructions is not None
 99            and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
100        ):
101            raise ValueError(
102                f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
103            )
104        if (
105            self.thinking_instructions is None
106            and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
107        ):
108            raise ValueError(
109                f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
110            )
111        return self

The Kiln fine-tune datamodel.

Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
description: str | None
structured_output_mode: StructuredOutputMode | None
provider: str
base_model_id: str
provider_id: str | None
fine_tune_model_id: str | None
dataset_split_id: str
train_split_name: str
validation_split_name: str | None
parameters: dict[str, str | int | float | bool]
system_message: str
thinking_instructions: str | None
latest_status: FineTuneStatusType
properties: Dict[str, str | int | float]
data_strategy: kiln_ai.datamodel.datamodel_enums.ChatStrategy
def parent_task(self) -> Optional[Task]:
90    def parent_task(self) -> Union["Task", None]:
91        if self.parent is None or self.parent.__class__.__name__ != "Task":
92            return None
93        return self.parent  # type: ignore
@model_validator(mode='after')
def validate_thinking_instructions(self) -> Self:
 95    @model_validator(mode="after")
 96    def validate_thinking_instructions(self) -> Self:
 97        if (
 98            self.thinking_instructions is not None
 99            and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
100        ):
101            raise ValueError(
102                f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
103            )
104        if (
105            self.thinking_instructions is None
106            and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS
107        ):
108            raise ValueError(
109                f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}"
110            )
111        return self
def relationship_name() -> str:
713        def relationship_name_method() -> str:
714            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
706        def parent_class_method() -> Type[KilnParentModel]:
707            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Priority(enum.IntEnum):
 9class Priority(IntEnum):
10    """Defines priority levels for tasks and requirements, where P0 is highest priority."""
11
12    p0 = 0
13    p1 = 1
14    p2 = 2
15    p3 = 3

Defines priority levels for tasks and requirements, where P0 is highest priority.

p0 = <Priority.p0: 0>
p1 = <Priority.p1: 1>
p2 = <Priority.p2: 2>
p3 = <Priority.p3: 3>
class Project(kiln_ai.datamodel.basemodel.KilnParentModel):
15class Project(
16    KilnParentModel,
17    parent_of={
18        "tasks": Task,
19        "documents": Document,
20        "extractor_configs": ExtractorConfig,
21        "chunker_configs": ChunkerConfig,
22        "embedding_configs": EmbeddingConfig,
23        "rag_configs": RagConfig,
24        "vector_store_configs": VectorStoreConfig,
25        "external_tool_servers": ExternalToolServer,
26        "reranker_configs": RerankerConfig,
27    },
28):
29    """
30    A collection of related tasks.
31
32    Projects organize tasks into logical groups and provide high-level descriptions
33    of the overall goals.
34    """
35
36    name: FilenameString = Field(description="The name of the project.")
37    description: str | None = Field(
38        default=None,
39        description="A description of the project for you and your team. Will not be used in prompts/training/validation.",
40    )
41
42    # Needed for typechecking. We should fix this in KilnParentModel
43    def tasks(self, readonly: bool = False) -> list[Task]:
44        return super().tasks(readonly=readonly)  # type: ignore
45
46    def documents(self, readonly: bool = False) -> list[Document]:
47        return super().documents(readonly=readonly)  # type: ignore
48
49    def extractor_configs(self, readonly: bool = False) -> list[ExtractorConfig]:
50        return super().extractor_configs(readonly=readonly)  # type: ignore
51
52    def chunker_configs(self, readonly: bool = False) -> list[ChunkerConfig]:
53        return super().chunker_configs(readonly=readonly)  # type: ignore
54
55    def embedding_configs(self, readonly: bool = False) -> list[EmbeddingConfig]:
56        return super().embedding_configs(readonly=readonly)  # type: ignore
57
58    def vector_store_configs(self, readonly: bool = False) -> list[VectorStoreConfig]:
59        return super().vector_store_configs(readonly=readonly)  # type: ignore
60
61    def rag_configs(self, readonly: bool = False) -> list[RagConfig]:
62        return super().rag_configs(readonly=readonly)  # type: ignore
63
64    def external_tool_servers(self, readonly: bool = False) -> list[ExternalToolServer]:
65        return super().external_tool_servers(readonly=readonly)  # type: ignore
66
67    def reranker_configs(self, readonly: bool = False) -> list[RerankerConfig]:
68        return super().reranker_configs(readonly=readonly)  # type: ignore

A collection of related tasks.

Projects organize tasks into logical groups and provide high-level descriptions of the overall goals.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
description: str | None
def tasks(self, readonly=False) -> List[Task]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def documents(self, readonly=False) -> List[kiln_ai.datamodel.extraction.Document]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def extractor_configs( self, readonly=False) -> List[kiln_ai.datamodel.extraction.ExtractorConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def chunker_configs(self, readonly=False) -> List[kiln_ai.datamodel.chunk.ChunkerConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def embedding_configs( self, readonly=False) -> List[kiln_ai.datamodel.embedding.EmbeddingConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def vector_store_configs( self, readonly=False) -> List[kiln_ai.datamodel.vector_store.VectorStoreConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def rag_configs(self, readonly=False) -> List[kiln_ai.datamodel.rag.RagConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def external_tool_servers( self, readonly=False) -> List[ExternalToolServer]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def reranker_configs(self, readonly=False) -> List[kiln_ai.datamodel.reranker.RerankerConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Prompt(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.BasePrompt):
33class Prompt(KilnParentedModel, BasePrompt):
34    """
35    A prompt for a task. This is the custom prompt parented by a task.
36    """
37
38    pass

A prompt for a task. This is the custom prompt parented by a task.

def relationship_name() -> str:
713        def relationship_name_method() -> str:
714            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
706        def parent_class_method() -> Type[KilnParentModel]:
707            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class PromptGenerators(builtins.str, enum.Enum):
 9class PromptGenerators(str, Enum):
10    SIMPLE = "simple_prompt_builder"
11    MULTI_SHOT = "multi_shot_prompt_builder"
12    FEW_SHOT = "few_shot_prompt_builder"
13    REPAIRS = "repairs_prompt_builder"
14    SIMPLE_CHAIN_OF_THOUGHT = "simple_chain_of_thought_prompt_builder"
15    FEW_SHOT_CHAIN_OF_THOUGHT = "few_shot_chain_of_thought_prompt_builder"
16    MULTI_SHOT_CHAIN_OF_THOUGHT = "multi_shot_chain_of_thought_prompt_builder"
17    SHORT = "short_prompt_builder"

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.

SIMPLE = <PromptGenerators.SIMPLE: 'simple_prompt_builder'>
MULTI_SHOT = <PromptGenerators.MULTI_SHOT: 'multi_shot_prompt_builder'>
FEW_SHOT = <PromptGenerators.FEW_SHOT: 'few_shot_prompt_builder'>
REPAIRS = <PromptGenerators.REPAIRS: 'repairs_prompt_builder'>
SIMPLE_CHAIN_OF_THOUGHT = <PromptGenerators.SIMPLE_CHAIN_OF_THOUGHT: 'simple_chain_of_thought_prompt_builder'>
FEW_SHOT_CHAIN_OF_THOUGHT = <PromptGenerators.FEW_SHOT_CHAIN_OF_THOUGHT: 'few_shot_chain_of_thought_prompt_builder'>
MULTI_SHOT_CHAIN_OF_THOUGHT = <PromptGenerators.MULTI_SHOT_CHAIN_OF_THOUGHT: 'multi_shot_chain_of_thought_prompt_builder'>
SHORT = <PromptGenerators.SHORT: 'short_prompt_builder'>
PromptId = typing.Annotated[str, AfterValidator(func=<function <lambda>>)]
class RequirementRating(pydantic.main.BaseModel):
20class RequirementRating(BaseModel):
21    """Rating for a specific requirement within a task output."""
22
23    value: float = Field(
24        description="The rating value. Interpretation depends on rating type"
25    )
26    type: TaskOutputRatingType = Field(description="The type of rating")

Rating for a specific requirement within a task output.

value: float
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class StructuredOutputMode(builtins.str, enum.Enum):
28class StructuredOutputMode(str, Enum):
29    """
30    Enumeration of supported structured output modes.
31
32    - json_schema: request json using API capabilities for json_schema
33    - function_calling: request json using API capabilities for function calling
34    - json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
35    - json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
36    - json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
37    - json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
38    - default: let the adapter decide (legacy, do not use for new use cases)
39    - unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
40    """
41
42    default = "default"
43    json_schema = "json_schema"
44    function_calling_weak = "function_calling_weak"
45    function_calling = "function_calling"
46    json_mode = "json_mode"
47    json_instructions = "json_instructions"
48    json_instruction_and_object = "json_instruction_and_object"
49    json_custom_instructions = "json_custom_instructions"
50    unknown = "unknown"

Enumeration of supported structured output modes.

  • json_schema: request json using API capabilities for json_schema
  • function_calling: request json using API capabilities for function calling
  • json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
  • json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
  • json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
  • json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
  • default: let the adapter decide (legacy, do not use for new use cases)
  • unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
default = <StructuredOutputMode.default: 'default'>
json_schema = <StructuredOutputMode.json_schema: 'json_schema'>
function_calling_weak = <StructuredOutputMode.function_calling_weak: 'function_calling_weak'>
function_calling = <StructuredOutputMode.function_calling: 'function_calling'>
json_mode = <StructuredOutputMode.json_mode: 'json_mode'>
json_instructions = <StructuredOutputMode.json_instructions: 'json_instructions'>
json_instruction_and_object = <StructuredOutputMode.json_instruction_and_object: 'json_instruction_and_object'>
json_custom_instructions = <StructuredOutputMode.json_custom_instructions: 'json_custom_instructions'>
unknown = <StructuredOutputMode.unknown: 'unknown'>
class Task(kiln_ai.datamodel.basemodel.KilnParentedModel, kiln_ai.datamodel.basemodel.KilnParentModel):
103class Task(
104    KilnParentedModel,
105    KilnParentModel,
106    parent_of={
107        "runs": TaskRun,
108        "dataset_splits": DatasetSplit,
109        "finetunes": Finetune,
110        "prompts": Prompt,
111        "evals": Eval,
112        "run_configs": TaskRunConfig,
113    },
114):
115    """
116    Represents a specific task to be performed, with associated requirements and validation rules.
117
118    Contains the task definition, requirements, input/output schemas, and maintains
119    a collection of task runs.
120    """
121
122    name: FilenameString = Field(description="The name of the task.")
123    description: str | None = Field(
124        default=None,
125        description="A description of the task for you and your team. Will not be used in prompts/training/validation.",
126    )
127    instruction: str = Field(
128        min_length=1,
129        description="The instructions for the task. Will be used in prompts/training/validation.",
130    )
131    requirements: List[TaskRequirement] = Field(default=[])
132    # Output must be an object schema, as things like tool calls only allow objects
133    output_json_schema: JsonObjectSchema | None = None
134    # Inputs are more flexible, allowing arrays
135    input_json_schema: JsonSchema | None = None
136    thinking_instruction: str | None = Field(
137        default=None,
138        description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting.",
139    )
140
141    default_run_config_id: ID_TYPE | None = Field(
142        default=None,
143        description="ID of the run config to use for this task by default. Must exist in saved run configs for this task.",
144    )
145
146    def output_schema(self) -> Dict | None:
147        if self.output_json_schema is None:
148            return None
149        return schema_from_json_str(self.output_json_schema)
150
151    def input_schema(self) -> Dict | None:
152        if self.input_json_schema is None:
153            return None
154        # Allow arrays, not just objects
155        return schema_from_json_str(self.input_json_schema, require_object=False)
156
157    # These wrappers help for typechecking. We should fix this in KilnParentModel
158    def runs(self, readonly: bool = False) -> list[TaskRun]:
159        return super().runs(readonly=readonly)  # type: ignore
160
161    def dataset_splits(self, readonly: bool = False) -> list[DatasetSplit]:
162        return super().dataset_splits(readonly=readonly)  # type: ignore
163
164    def finetunes(self, readonly: bool = False) -> list[Finetune]:
165        return super().finetunes(readonly=readonly)  # type: ignore
166
167    def prompts(self, readonly: bool = False) -> list[Prompt]:
168        return super().prompts(readonly=readonly)  # type: ignore
169
170    def evals(self, readonly: bool = False) -> list[Eval]:
171        return super().evals(readonly=readonly)  # type: ignore
172
173    def run_configs(self, readonly: bool = False) -> list[TaskRunConfig]:
174        return super().run_configs(readonly=readonly)  # type: ignore
175
176    # Workaround to return typed parent without importing Task
177    def parent_project(self) -> Union["Project", None]:
178        if self.parent is None or self.parent.__class__.__name__ != "Project":
179            return None
180        return self.parent  # type: ignore

Represents a specific task to be performed, with associated requirements and validation rules.

Contains the task definition, requirements, input/output schemas, and maintains a collection of task runs.

name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0c9a0>, json_schema_input_type=PydanticUndefined)]
description: str | None
instruction: str
requirements: List[TaskRequirement]
output_json_schema: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7f2f1eab8720>)]]
input_json_schema: Optional[Annotated[str, AfterValidator(func=<function <lambda> at 0x7f2f1eab89a0>)]]
thinking_instruction: str | None
default_run_config_id: Optional[str]
def output_schema(self) -> Optional[Dict]:
146    def output_schema(self) -> Dict | None:
147        if self.output_json_schema is None:
148            return None
149        return schema_from_json_str(self.output_json_schema)
def input_schema(self) -> Optional[Dict]:
151    def input_schema(self) -> Dict | None:
152        if self.input_json_schema is None:
153            return None
154        # Allow arrays, not just objects
155        return schema_from_json_str(self.input_json_schema, require_object=False)
def runs(self, readonly=False) -> List[TaskRun]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def dataset_splits( self, readonly=False) -> List[DatasetSplit]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def finetunes(self, readonly=False) -> List[Finetune]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def prompts(self, readonly=False) -> List[Prompt]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def evals(self, readonly=False) -> List[kiln_ai.datamodel.eval.Eval]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def run_configs(self, readonly=False) -> List[kiln_ai.datamodel.task.TaskRunConfig]:
695        def child_method(self, readonly: bool = False) -> list[child_class]:
696            return child_class.all_children_of_parent_path(self.path, readonly=readonly)

The type of the None singleton.

def parent_project(self) -> Optional[Project]:
177    def parent_project(self) -> Union["Project", None]:
178        if self.parent is None or self.parent.__class__.__name__ != "Project":
179            return None
180        return self.parent  # type: ignore
def relationship_name() -> str:
713        def relationship_name_method() -> str:
714            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
706        def parent_class_method() -> Type[KilnParentModel]:
707            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutput(kiln_ai.datamodel.basemodel.KilnBaseModel):
322class TaskOutput(KilnBaseModel):
323    """
324    An output for a specific task run.
325
326    Contains the actual output content, its source (human or synthetic),
327    and optional rating information.
328    """
329
330    output: str = Field(
331        description="The output of the task. JSON formatted for structured output, plaintext for unstructured output."
332    )
333    source: DataSource | None = Field(
334        description="The source of the output: human or synthetic.",
335        default=None,
336    )
337    rating: TaskOutputRating | None = Field(
338        default=None, description="The rating of the output"
339    )
340
341    def validate_output_format(self, task: "Task") -> Self:
342        # validate output
343        if task.output_json_schema is not None:
344            try:
345                output_parsed = json.loads(self.output)
346            except json.JSONDecodeError:
347                raise ValueError("Output is not a valid JSON object")
348
349            validate_schema_with_value_error(
350                output_parsed,
351                task.output_json_schema,
352                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
353            )
354        return self
355
356    @model_validator(mode="after")
357    def validate_output_source(self, info: ValidationInfo) -> Self:
358        # On strict mode and not loaded from file, we validate output_source is not None.
359        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
360        if not strict_mode():
361            return self
362        if self.loaded_from_file(info):
363            return self
364        if self.source is None:
365            raise ValueError("Output source is required when strict mode is enabled")
366        return self

An output for a specific task run.

Contains the actual output content, its source (human or synthetic), and optional rating information.

output: str
source: DataSource | None
rating: TaskOutputRating | None
def validate_output_format(self, task: Task) -> Self:
341    def validate_output_format(self, task: "Task") -> Self:
342        # validate output
343        if task.output_json_schema is not None:
344            try:
345                output_parsed = json.loads(self.output)
346            except json.JSONDecodeError:
347                raise ValueError("Output is not a valid JSON object")
348
349            validate_schema_with_value_error(
350                output_parsed,
351                task.output_json_schema,
352                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
353            )
354        return self
@model_validator(mode='after')
def validate_output_source(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
356    @model_validator(mode="after")
357    def validate_output_source(self, info: ValidationInfo) -> Self:
358        # On strict mode and not loaded from file, we validate output_source is not None.
359        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
360        if not strict_mode():
361            return self
362        if self.loaded_from_file(info):
363            return self
364        if self.source is None:
365            raise ValueError("Output source is required when strict mode is enabled")
366        return self
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutputRating(kiln_ai.datamodel.basemodel.KilnBaseModel):
 50class TaskOutputRating(KilnBaseModel):
 51    """
 52    A rating for a task output, including an overall rating and ratings for each requirement.
 53
 54    Supports:
 55    - five_star: 1-5 star ratings
 56    - pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
 57    - pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
 58    """
 59
 60    type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star)
 61    value: float | None = Field(
 62        description="The rating value. Interpretation depends on rating type:\n- five_star: 1-5 stars\n- pass_fail: 1.0 (pass) or 0.0 (fail)\n- pass_fail_critical: 1.0 (pass), 0.0 (fail), or -1.0 (critical fail)",
 63        default=None,
 64    )
 65    requirement_ratings: Dict[ID_TYPE, RequirementRating] = Field(
 66        default={},
 67        description="The ratings of the requirements of the task. The ID can be either a task_requirement_id or a named rating for an eval_output_score name (in format 'named::<name>').",
 68    )
 69
 70    # Previously we stored rating values as a dict of floats, but now we store them as RequirementRating objects.
 71    @model_validator(mode="before")
 72    def upgrade_old_format(cls, data: dict) -> dict:
 73        if not isinstance(data, dict):
 74            return data
 75
 76        # Check if we have the old format (dict of floats)
 77        req_ratings = data.get("requirement_ratings", {})
 78        if req_ratings and all(
 79            isinstance(v, (int, float)) for v in req_ratings.values()
 80        ):
 81            # Convert each float to a RequirementRating object
 82            # all ratings are five star at the point we used this format
 83            data["requirement_ratings"] = {
 84                k: {"value": v, "type": TaskOutputRatingType.five_star}
 85                for k, v in req_ratings.items()
 86            }
 87
 88        return data
 89
 90    # Used to select high quality outputs for example selection (MultiShotPromptBuilder, etc)
 91    def is_high_quality(self) -> bool:
 92        if self.value is None:
 93            return False
 94
 95        if self.type == TaskOutputRatingType.five_star:
 96            return self.value >= 4
 97        elif self.type == TaskOutputRatingType.pass_fail:
 98            return self.value == 1.0
 99        elif self.type == TaskOutputRatingType.pass_fail_critical:
100            return self.value == 1.0
101        return False
102
103    @model_validator(mode="after")
104    def validate_rating(self) -> Self:
105        if self.type not in TaskOutputRatingType:
106            raise ValueError(f"Invalid rating type: {self.type}")
107
108        # Overall rating is optional
109        if self.value is not None:
110            self._validate_rating(self.type, self.value, "overall rating")
111
112        for req_id, req_rating in self.requirement_ratings.items():
113            self._validate_rating(
114                req_rating.type,
115                req_rating.value,
116                f"requirement rating for req ID: {req_id}",
117            )
118
119        return self
120
121    def _validate_rating(
122        self, type: TaskOutputRatingType, rating: float | None, rating_name: str
123    ) -> None:
124        if type == TaskOutputRatingType.five_star:
125            self._validate_five_star(rating, rating_name)
126        elif type == TaskOutputRatingType.pass_fail:
127            self._validate_pass_fail(rating, rating_name)
128        elif type == TaskOutputRatingType.pass_fail_critical:
129            self._validate_pass_fail_critical(rating, rating_name)
130
131    def _validate_five_star(self, rating: float | None, rating_name: str) -> None:
132        if rating is None or not isinstance(rating, float) or not rating.is_integer():
133            raise ValueError(
134                f"{rating_name.capitalize()} of type five_star must be an integer value (1-5)"
135            )
136        if rating < 1 or rating > 5:
137            raise ValueError(
138                f"{rating_name.capitalize()} of type five_star must be between 1 and 5 stars"
139            )
140
141    def _validate_pass_fail(self, rating: float | None, rating_name: str) -> None:
142        if rating is None or not isinstance(rating, float) or not rating.is_integer():
143            raise ValueError(
144                f"{rating_name.capitalize()} of type pass_fail must be an integer value (0 or 1)"
145            )
146        if rating not in [0, 1]:
147            raise ValueError(
148                f"{rating_name.capitalize()} of type pass_fail must be 0 (fail) or 1 (pass)"
149            )
150
151    def _validate_pass_fail_critical(
152        self, rating: float | None, rating_name: str
153    ) -> None:
154        if rating is None or not isinstance(rating, float) or not rating.is_integer():
155            raise ValueError(
156                f"{rating_name.capitalize()} of type pass_fail_critical must be an integer value (-1, 0, or 1)"
157            )
158        if rating not in [-1, 0, 1]:
159            raise ValueError(
160                f"{rating_name.capitalize()} of type pass_fail_critical must be -1 (critical fail), 0 (fail), or 1 (pass)"
161            )

A rating for a task output, including an overall rating and ratings for each requirement.

Supports:

  • five_star: 1-5 star ratings
  • pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
  • pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
value: float | None
requirement_ratings: Dict[Optional[str], RequirementRating]
@model_validator(mode='before')
def upgrade_old_format(cls, data: dict) -> dict:
71    @model_validator(mode="before")
72    def upgrade_old_format(cls, data: dict) -> dict:
73        if not isinstance(data, dict):
74            return data
75
76        # Check if we have the old format (dict of floats)
77        req_ratings = data.get("requirement_ratings", {})
78        if req_ratings and all(
79            isinstance(v, (int, float)) for v in req_ratings.values()
80        ):
81            # Convert each float to a RequirementRating object
82            # all ratings are five star at the point we used this format
83            data["requirement_ratings"] = {
84                k: {"value": v, "type": TaskOutputRatingType.five_star}
85                for k, v in req_ratings.items()
86            }
87
88        return data
def is_high_quality(self) -> bool:
 91    def is_high_quality(self) -> bool:
 92        if self.value is None:
 93            return False
 94
 95        if self.type == TaskOutputRatingType.five_star:
 96            return self.value >= 4
 97        elif self.type == TaskOutputRatingType.pass_fail:
 98            return self.value == 1.0
 99        elif self.type == TaskOutputRatingType.pass_fail_critical:
100            return self.value == 1.0
101        return False
@model_validator(mode='after')
def validate_rating(self) -> Self:
103    @model_validator(mode="after")
104    def validate_rating(self) -> Self:
105        if self.type not in TaskOutputRatingType:
106            raise ValueError(f"Invalid rating type: {self.type}")
107
108        # Overall rating is optional
109        if self.value is not None:
110            self._validate_rating(self.type, self.value, "overall rating")
111
112        for req_id, req_rating in self.requirement_ratings.items():
113            self._validate_rating(
114                req_rating.type,
115                req_rating.value,
116                f"requirement rating for req ID: {req_id}",
117            )
118
119        return self
model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class TaskOutputRatingType(builtins.str, enum.Enum):
19class TaskOutputRatingType(str, Enum):
20    """Defines the types of rating systems available for task outputs."""
21
22    five_star = "five_star"
23    pass_fail = "pass_fail"
24    pass_fail_critical = "pass_fail_critical"
25    custom = "custom"

Defines the types of rating systems available for task outputs.

five_star = <TaskOutputRatingType.five_star: 'five_star'>
pass_fail = <TaskOutputRatingType.pass_fail: 'pass_fail'>
pass_fail_critical = <TaskOutputRatingType.pass_fail_critical: 'pass_fail_critical'>
custom = <TaskOutputRatingType.custom: 'custom'>
class TaskRequirement(pydantic.main.BaseModel):
35class TaskRequirement(BaseModel):
36    """
37    Defines a specific requirement that should be met by task outputs.
38
39    Includes an identifier, name, description, instruction for meeting the requirement,
40    priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).
41    """
42
43    id: ID_TYPE = ID_FIELD
44    name: FilenameStringShort = Field(description="The name of the task requirement.")
45    description: str | None = Field(default=None)
46    instruction: str = Field(min_length=1)
47    priority: Priority = Field(default=Priority.p2)
48    type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star)

Defines a specific requirement that should be met by task outputs.

Includes an identifier, name, description, instruction for meeting the requirement, priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).

id: Optional[str]
name: Annotated[str, BeforeValidator(func=<function name_validator.<locals>.fn at 0x7f2f1ec0dbc0>, json_schema_input_type=PydanticUndefined)]
description: str | None
instruction: str
priority: Priority
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

class TaskRun(kiln_ai.datamodel.basemodel.KilnParentedModel):
 77class TaskRun(KilnParentedModel):
 78    """
 79    Represents a single execution of a Task.
 80
 81    Contains the input used, its source, the output produced, and optional
 82    repair information if the output needed correction.
 83    """
 84
 85    input: str = Field(
 86        description="The inputs to the task. JSON formatted for structured input, plaintext for unstructured input."
 87    )
 88    input_source: DataSource | None = Field(
 89        default=None, description="The source of the input: human or synthetic."
 90    )
 91
 92    output: TaskOutput = Field(description="The output of the task run.")
 93    repair_instructions: str | None = Field(
 94        default=None,
 95        description="Instructions for fixing the output. Should define what is wrong, and how to fix it. Will be used by models for both generating a fixed output, and evaluating future models.",
 96    )
 97    repaired_output: TaskOutput | None = Field(
 98        default=None,
 99        description="An version of the output with issues fixed. This must be a 'fixed' version of the existing output, and not an entirely new output. If you wish to generate an ideal curatorial output for this task unrelated to this output, generate a new TaskOutput with type 'human' instead of using this field.",
100    )
101    intermediate_outputs: Dict[str, str] | None = Field(
102        default=None,
103        description="Intermediate outputs from the task run. Keys are the names of the intermediate output steps (cot=chain of thought, etc), values are the output data.",
104    )
105    tags: List[str] = Field(
106        default=[],
107        description="Tags for the task run. Tags are used to categorize task runs for filtering and reporting.",
108    )
109    usage: Usage | None = Field(
110        default=None,
111        description="Usage information for the task run. This includes the number of input tokens, output tokens, and total tokens used.",
112    )
113    trace: list[ChatCompletionMessageParam] | None = Field(
114        default=None,
115        description="The trace of the task run in OpenAI format. This is the list of messages that were sent to/from the model.",
116    )
117
118    def thinking_training_data(self) -> str | None:
119        """
120        Get the thinking training data from the task run.
121        """
122        if self.intermediate_outputs is None:
123            return None
124        return self.intermediate_outputs.get(
125            "reasoning"
126        ) or self.intermediate_outputs.get("chain_of_thought")
127
128    def has_thinking_training_data(self) -> bool:
129        """
130        Does this run have thinking data that we can use to train a thinking model?
131        """
132        return self.thinking_training_data() is not None
133
134    # Workaround to return typed parent without importing Task
135    def parent_task(self) -> Union["Task", None]:
136        if self.parent is None or self.parent.__class__.__name__ != "Task":
137            return None
138        return self.parent  # type: ignore
139
140    @model_validator(mode="after")
141    def validate_input_format(self, info: ValidationInfo) -> Self:
142        # Don't validate if loading from file (not new). Too slow.
143        # We don't allow changing task schema, so this is redundant validation.
144        # Note: we still validate if editing a loaded model
145        if self.loading_from_file(info):
146            # Consider loading an existing model as validated.
147            self._last_validated_input = self.input
148            return self
149
150        # Don't validate if input has not changed. Too slow to run this every time.
151        if (
152            hasattr(self, "_last_validated_input")
153            and self.input == self._last_validated_input
154        ):
155            return self
156
157        task = self.parent_task()
158        if task is None:
159            # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving)
160            return self
161
162        # validate input
163        if task.input_json_schema is not None:
164            try:
165                input_parsed = json.loads(self.input)
166            except json.JSONDecodeError:
167                raise ValueError("Input is not a valid JSON object")
168
169            validate_schema_with_value_error(
170                input_parsed,
171                task.input_json_schema,
172                "Input does not match task input schema.",
173                require_object=False,
174            )
175
176        self._last_validated_input = self.input
177        return self
178
179    @model_validator(mode="after")
180    def validate_output_format(self, info: ValidationInfo) -> Self:
181        # Don't validate if loading from file (not new). Too slow.
182        # Note: we still validate if editing a loaded model's output.
183        if self.loading_from_file(info):
184            # Consider loading an existing model as validated.
185            self._last_validated_output = self.output.output if self.output else None
186            return self
187
188        # Don't validate unless output has changed since last validation.
189        # The validator is slow and costly, don't want it running when setting other fields.
190        if (
191            hasattr(self, "_last_validated_output")
192            and self.output is not None
193            and self.output.output == self._last_validated_output
194        ):
195            return self
196
197        task = self.parent_task()
198        if task is None:
199            return self
200
201        self.output.validate_output_format(task)
202        self._last_validated_output = self.output.output if self.output else None
203        return self
204
205    @model_validator(mode="after")
206    def validate_repaired_output(self) -> Self:
207        if self.repaired_output is not None:
208            if self.repaired_output.rating is not None:
209                raise ValueError(
210                    "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed."
211                )
212
213            task = self.parent_task()
214            if (
215                task is not None
216                and self.repaired_output.output is not None
217                and task.output_json_schema is not None
218            ):
219                try:
220                    output_parsed = json.loads(self.repaired_output.output)
221                except json.JSONDecodeError:
222                    raise ValueError("Repaired output is not a valid JSON object")
223
224                validate_schema_with_value_error(
225                    output_parsed,
226                    task.output_json_schema,
227                    "Repaired output does not match task output schema.",
228                )
229
230        if self.repair_instructions is None and self.repaired_output is not None:
231            raise ValueError(
232                "Repair instructions are required if providing a repaired output."
233            )
234        if self.repair_instructions is not None and self.repaired_output is None:
235            raise ValueError(
236                "A repaired output is required if providing repair instructions."
237            )
238
239        return self
240
241    @model_validator(mode="after")
242    def validate_input_source(self, info: ValidationInfo) -> Self:
243        # On strict mode and not loaded from file, we validate input_source is not None.
244        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
245        if not strict_mode():
246            return self
247        if self.loaded_from_file(info):
248            return self
249        if self.input_source is None:
250            raise ValueError("input_source is required when strict mode is enabled")
251        return self
252
253    @model_validator(mode="after")
254    def validate_tags(self) -> Self:
255        for tag in self.tags:
256            if not tag:
257                raise ValueError("Tags cannot be empty strings")
258            if " " in tag:
259                raise ValueError("Tags cannot contain spaces. Try underscores.")
260
261        return self

Represents a single execution of a Task.

Contains the input used, its source, the output produced, and optional repair information if the output needed correction.

input: str
input_source: DataSource | None
output: TaskOutput
repair_instructions: str | None
repaired_output: TaskOutput | None
intermediate_outputs: Optional[Dict[str, str]]
tags: List[str]
usage: Usage | None
trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None
def thinking_training_data(self) -> str | None:
118    def thinking_training_data(self) -> str | None:
119        """
120        Get the thinking training data from the task run.
121        """
122        if self.intermediate_outputs is None:
123            return None
124        return self.intermediate_outputs.get(
125            "reasoning"
126        ) or self.intermediate_outputs.get("chain_of_thought")

Get the thinking training data from the task run.

def has_thinking_training_data(self) -> bool:
128    def has_thinking_training_data(self) -> bool:
129        """
130        Does this run have thinking data that we can use to train a thinking model?
131        """
132        return self.thinking_training_data() is not None

Does this run have thinking data that we can use to train a thinking model?

def parent_task(self) -> Optional[Task]:
135    def parent_task(self) -> Union["Task", None]:
136        if self.parent is None or self.parent.__class__.__name__ != "Task":
137            return None
138        return self.parent  # type: ignore
@model_validator(mode='after')
def validate_input_format(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
140    @model_validator(mode="after")
141    def validate_input_format(self, info: ValidationInfo) -> Self:
142        # Don't validate if loading from file (not new). Too slow.
143        # We don't allow changing task schema, so this is redundant validation.
144        # Note: we still validate if editing a loaded model
145        if self.loading_from_file(info):
146            # Consider loading an existing model as validated.
147            self._last_validated_input = self.input
148            return self
149
150        # Don't validate if input has not changed. Too slow to run this every time.
151        if (
152            hasattr(self, "_last_validated_input")
153            and self.input == self._last_validated_input
154        ):
155            return self
156
157        task = self.parent_task()
158        if task is None:
159            # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving)
160            return self
161
162        # validate input
163        if task.input_json_schema is not None:
164            try:
165                input_parsed = json.loads(self.input)
166            except json.JSONDecodeError:
167                raise ValueError("Input is not a valid JSON object")
168
169            validate_schema_with_value_error(
170                input_parsed,
171                task.input_json_schema,
172                "Input does not match task input schema.",
173                require_object=False,
174            )
175
176        self._last_validated_input = self.input
177        return self
@model_validator(mode='after')
def validate_output_format(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
179    @model_validator(mode="after")
180    def validate_output_format(self, info: ValidationInfo) -> Self:
181        # Don't validate if loading from file (not new). Too slow.
182        # Note: we still validate if editing a loaded model's output.
183        if self.loading_from_file(info):
184            # Consider loading an existing model as validated.
185            self._last_validated_output = self.output.output if self.output else None
186            return self
187
188        # Don't validate unless output has changed since last validation.
189        # The validator is slow and costly, don't want it running when setting other fields.
190        if (
191            hasattr(self, "_last_validated_output")
192            and self.output is not None
193            and self.output.output == self._last_validated_output
194        ):
195            return self
196
197        task = self.parent_task()
198        if task is None:
199            return self
200
201        self.output.validate_output_format(task)
202        self._last_validated_output = self.output.output if self.output else None
203        return self
@model_validator(mode='after')
def validate_repaired_output(self) -> Self:
205    @model_validator(mode="after")
206    def validate_repaired_output(self) -> Self:
207        if self.repaired_output is not None:
208            if self.repaired_output.rating is not None:
209                raise ValueError(
210                    "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed."
211                )
212
213            task = self.parent_task()
214            if (
215                task is not None
216                and self.repaired_output.output is not None
217                and task.output_json_schema is not None
218            ):
219                try:
220                    output_parsed = json.loads(self.repaired_output.output)
221                except json.JSONDecodeError:
222                    raise ValueError("Repaired output is not a valid JSON object")
223
224                validate_schema_with_value_error(
225                    output_parsed,
226                    task.output_json_schema,
227                    "Repaired output does not match task output schema.",
228                )
229
230        if self.repair_instructions is None and self.repaired_output is not None:
231            raise ValueError(
232                "Repair instructions are required if providing a repaired output."
233            )
234        if self.repair_instructions is not None and self.repaired_output is None:
235            raise ValueError(
236                "A repaired output is required if providing repair instructions."
237            )
238
239        return self
@model_validator(mode='after')
def validate_input_source(self, info: pydantic_core.core_schema.ValidationInfo) -> Self:
241    @model_validator(mode="after")
242    def validate_input_source(self, info: ValidationInfo) -> Self:
243        # On strict mode and not loaded from file, we validate input_source is not None.
244        # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data.
245        if not strict_mode():
246            return self
247        if self.loaded_from_file(info):
248            return self
249        if self.input_source is None:
250            raise ValueError("input_source is required when strict mode is enabled")
251        return self
@model_validator(mode='after')
def validate_tags(self) -> Self:
253    @model_validator(mode="after")
254    def validate_tags(self) -> Self:
255        for tag in self.tags:
256            if not tag:
257                raise ValueError("Tags cannot be empty strings")
258            if " " in tag:
259                raise ValueError("Tags cannot contain spaces. Try underscores.")
260
261        return self
def relationship_name() -> str:
713        def relationship_name_method() -> str:
714            return relationship_name

The type of the None singleton.

def parent_type() -> Type[kiln_ai.datamodel.basemodel.KilnParentModel]:
706        def parent_class_method() -> Type[KilnParentModel]:
707            return cls

The type of the None singleton.

model_config = {'validate_assignment': True}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def model_post_init(self: pydantic.main.BaseModel, context: Any, /) -> None:
337def init_private_attributes(self: BaseModel, context: Any, /) -> None:
338    """This function is meant to behave like a BaseModel method to initialise private attributes.
339
340    It takes context as an argument since that's what pydantic-core passes when calling it.
341
342    Args:
343        self: The BaseModel instance.
344        context: The context.
345    """
346    if getattr(self, '__pydantic_private__', None) is None:
347        pydantic_private = {}
348        for name, private_attr in self.__private_attributes__.items():
349            default = private_attr.get_default()
350            if default is not PydanticUndefined:
351                pydantic_private[name] = default
352        object_setattr(self, '__pydantic_private__', pydantic_private)

This function is meant to behave like a BaseModel method to initialise private attributes.

It takes context as an argument since that's what pydantic-core passes when calling it.

Args: self: The BaseModel instance. context: The context.

class Usage(pydantic.main.BaseModel):
18class Usage(BaseModel):
19    input_tokens: int | None = Field(
20        default=None,
21        description="The number of input tokens used in the task run.",
22        ge=0,
23    )
24    output_tokens: int | None = Field(
25        default=None,
26        description="The number of output tokens used in the task run.",
27        ge=0,
28    )
29    total_tokens: int | None = Field(
30        default=None,
31        description="The total number of tokens used in the task run.",
32        ge=0,
33    )
34    cost: float | None = Field(
35        default=None,
36        description="The cost of the task run in US dollars, saved at runtime (prices can change over time).",
37        ge=0,
38    )
39
40    def __add__(self, other: "Usage") -> "Usage":
41        """Add two Usage objects together, handling None values gracefully.
42
43        None + None = None
44        None + value = value
45        value + None = value
46        value1 + value2 = value1 + value2
47        """
48        if not isinstance(other, Usage):
49            raise TypeError(f"Cannot add Usage with {type(other).__name__}")
50
51        def _add_optional_int(a: int | None, b: int | None) -> int | None:
52            if a is None and b is None:
53                return None
54            if a is None:
55                return b
56            if b is None:
57                return a
58            return a + b
59
60        def _add_optional_float(a: float | None, b: float | None) -> float | None:
61            if a is None and b is None:
62                return None
63            if a is None:
64                return b
65            if b is None:
66                return a
67            return a + b
68
69        return Usage(
70            input_tokens=_add_optional_int(self.input_tokens, other.input_tokens),
71            output_tokens=_add_optional_int(self.output_tokens, other.output_tokens),
72            total_tokens=_add_optional_int(self.total_tokens, other.total_tokens),
73            cost=_add_optional_float(self.cost, other.cost),
74        )

!!! abstract "Usage Documentation" Models

A base class for creating Pydantic models.

Attributes: __class_vars__: The names of the class variables defined on the model. __private_attributes__: Metadata about the private attributes of the model. __signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.

__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
    This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
    __args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.

__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.

__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
    is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.
input_tokens: int | None
output_tokens: int | None
total_tokens: int | None
cost: float | None
model_config: ClassVar[pydantic.config.ConfigDict] = {}

Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].

def generate_model_id() -> str:
36def generate_model_id() -> str:
37    return str(uuid.uuid4().int)[:12]
prompt_generator_values = ['simple_prompt_builder', 'multi_shot_prompt_builder', 'few_shot_prompt_builder', 'repairs_prompt_builder', 'simple_chain_of_thought_prompt_builder', 'few_shot_chain_of_thought_prompt_builder', 'multi_shot_chain_of_thought_prompt_builder', 'short_prompt_builder']