kiln_ai.datamodel
See our docs for details about our datamodel classes and hierarchy:
Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html
1""" 2See our docs for details about our datamodel classes and hierarchy: 3 4Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html 5 6User docs: https://docs.kiln.tech/developers/kiln-datamodel 7""" 8 9# This component uses "flat" imports so we don't have too much internal structure exposed in the API. 10# for example you can just `from datamodel import Task, Project` instead of `from datamodel.task import Task; from datamodel.project import Project` 11 12from __future__ import annotations 13 14from kiln_ai.datamodel import ( 15 chunk, 16 dataset_split, 17 embedding, 18 eval, 19 extraction, 20 rag, 21 strict_mode, 22) 23from kiln_ai.datamodel.datamodel_enums import ( 24 FineTuneStatusType, 25 Priority, 26 StructuredOutputMode, 27 TaskOutputRatingType, 28) 29from kiln_ai.datamodel.dataset_split import DatasetSplit, DatasetSplitDefinition 30from kiln_ai.datamodel.external_tool_server import ExternalToolServer 31from kiln_ai.datamodel.finetune import Finetune 32from kiln_ai.datamodel.project import Project 33from kiln_ai.datamodel.prompt import BasePrompt, Prompt 34from kiln_ai.datamodel.prompt_id import ( 35 PromptGenerators, 36 PromptId, 37 prompt_generator_values, 38) 39from kiln_ai.datamodel.task import Task, TaskRequirement 40from kiln_ai.datamodel.task_output import ( 41 DataSource, 42 DataSourceProperty, 43 DataSourceType, 44 RequirementRating, 45 TaskOutput, 46 TaskOutputRating, 47) 48from kiln_ai.datamodel.task_run import TaskRun, Usage 49 50__all__ = [ 51 "BasePrompt", 52 "DataSource", 53 "DataSourceProperty", 54 "DataSourceType", 55 "DatasetSplit", 56 "DatasetSplitDefinition", 57 "ExternalToolServer", 58 "FineTuneStatusType", 59 "Finetune", 60 "Priority", 61 "Project", 62 "Prompt", 63 "PromptGenerators", 64 "PromptId", 65 "RequirementRating", 66 "StructuredOutputMode", 67 "Task", 68 "TaskOutput", 69 "TaskOutputRating", 70 "TaskOutputRatingType", 71 "TaskRequirement", 72 "TaskRun", 73 "Usage", 74 "chunk", 75 "dataset_split", 76 "embedding", 77 "eval", 78 "extraction", 79 "prompt_generator_values", 80 "rag", 81 "strict_mode", 82]
7class BasePrompt(BaseModel): 8 """ 9 A prompt for a task. This is the basic data storage format which can be used throughout a project. 10 11 The "Prompt" model name is reserved for the custom prompts parented by a task. 12 """ 13 14 name: FilenameString = Field(description="The name of the prompt.") 15 description: str | None = Field( 16 default=None, 17 description="A more detailed description of the prompt.", 18 ) 19 generator_id: str | None = Field( 20 default=None, 21 description="The id of the generator that created this prompt.", 22 ) 23 prompt: str = Field( 24 description="The prompt for the task.", 25 min_length=1, 26 ) 27 chain_of_thought_instructions: str | None = Field( 28 default=None, 29 description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting. COT will not be used unless this is provided.", 30 )
A prompt for a task. This is the basic data storage format which can be used throughout a project.
The "Prompt" model name is reserved for the custom prompts parented by a task.
192class DataSource(BaseModel): 193 """ 194 Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties. 195 196 Properties vary based on the source type - for synthetic/tool_call sources this includes 197 model information, for human sources this includes creator information, for file imports 198 this includes file information. 199 """ 200 201 type: DataSourceType 202 properties: Dict[str, str | int | float] = Field( 203 default={}, 204 description="Properties describing the data source. For synthetic things like model. For human: the human's name. For file_import: file information.", 205 ) 206 run_config: Optional[RunConfigProperties] = Field( 207 default=None, 208 description="The run config used to generate the data, if generated by a running a model in Kiln (only true for type=synthetic).", 209 ) 210 211 _data_source_properties = [ 212 DataSourceProperty( 213 name="created_by", 214 type=str, 215 required_for=[DataSourceType.human], 216 not_allowed_for=[ 217 DataSourceType.synthetic, 218 DataSourceType.file_import, 219 DataSourceType.tool_call, 220 ], 221 ), 222 DataSourceProperty( 223 name="model_name", 224 type=str, 225 required_for=[DataSourceType.synthetic], 226 not_allowed_for=[ 227 DataSourceType.human, 228 DataSourceType.file_import, 229 DataSourceType.tool_call, 230 ], 231 ), 232 DataSourceProperty( 233 name="model_provider", 234 type=str, 235 required_for=[DataSourceType.synthetic], 236 not_allowed_for=[ 237 DataSourceType.human, 238 DataSourceType.file_import, 239 DataSourceType.tool_call, 240 ], 241 ), 242 DataSourceProperty( 243 name="adapter_name", 244 type=str, 245 required_for=[DataSourceType.synthetic], 246 not_allowed_for=[ 247 DataSourceType.human, 248 DataSourceType.file_import, 249 DataSourceType.tool_call, 250 ], 251 ), 252 DataSourceProperty( 253 # Legacy field -- allow loading from old runs, but we shouldn't be setting it. 254 name="prompt_builder_name", 255 type=str, 256 not_allowed_for=[ 257 DataSourceType.human, 258 DataSourceType.file_import, 259 DataSourceType.tool_call, 260 ], 261 ), 262 DataSourceProperty( 263 # The PromptId of the prompt. Can be a saved prompt, fine-tune, generator name, etc. See PromptId type for more details. 264 name="prompt_id", 265 type=str, 266 not_allowed_for=[ 267 DataSourceType.human, 268 DataSourceType.file_import, 269 DataSourceType.tool_call, 270 ], 271 ), 272 DataSourceProperty( 273 name="file_name", 274 type=str, 275 required_for=[DataSourceType.file_import], 276 not_allowed_for=[ 277 DataSourceType.human, 278 DataSourceType.synthetic, 279 DataSourceType.tool_call, 280 ], 281 ), 282 ] 283 284 @model_validator(mode="after") 285 def validate_type(self) -> "DataSource": 286 if self.type not in DataSourceType: 287 raise ValueError(f"Invalid data source type: {self.type}") 288 return self 289 290 @model_validator(mode="after") 291 def validate_properties(self) -> "DataSource": 292 for prop in self._data_source_properties: 293 # Check the property type is correct 294 if prop.name in self.properties: 295 if not isinstance(self.properties[prop.name], prop.type): 296 raise ValueError( 297 f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source" 298 ) 299 # Check the property is required for the data source type 300 if self.type in prop.required_for: 301 if prop.name not in self.properties: 302 raise ValueError( 303 f"'{prop.name}' is required for {self.type} data source" 304 ) 305 # Check the property is not allowed for the data source type 306 elif self.type in prop.not_allowed_for and prop.name in self.properties: 307 raise ValueError( 308 f"'{prop.name}' is not allowed for {self.type} data source" 309 ) 310 return self 311 312 @model_validator(mode="after") 313 def validate_no_empty_properties(self) -> Self: 314 for prop, value in self.properties.items(): 315 if isinstance(value, str) and value == "": 316 raise ValueError( 317 f"Property '{prop}' must be a non-empty string for {self.type} data source" 318 ) 319 return self
Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.
Properties vary based on the source type - for synthetic/tool_call sources this includes model information, for human sources this includes creator information, for file imports this includes file information.
290 @model_validator(mode="after") 291 def validate_properties(self) -> "DataSource": 292 for prop in self._data_source_properties: 293 # Check the property type is correct 294 if prop.name in self.properties: 295 if not isinstance(self.properties[prop.name], prop.type): 296 raise ValueError( 297 f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source" 298 ) 299 # Check the property is required for the data source type 300 if self.type in prop.required_for: 301 if prop.name not in self.properties: 302 raise ValueError( 303 f"'{prop.name}' is required for {self.type} data source" 304 ) 305 # Check the property is not allowed for the data source type 306 elif self.type in prop.not_allowed_for and prop.name in self.properties: 307 raise ValueError( 308 f"'{prop.name}' is not allowed for {self.type} data source" 309 ) 310 return self
312 @model_validator(mode="after") 313 def validate_no_empty_properties(self) -> Self: 314 for prop, value in self.properties.items(): 315 if isinstance(value, str) and value == "": 316 raise ValueError( 317 f"Property '{prop}' must be a non-empty string for {self.type} data source" 318 ) 319 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
178class DataSourceProperty(BaseModel): 179 """ 180 Defines a property that can be associated with a data source. 181 182 Includes validation rules for when properties are required or not allowed 183 based on the data source type. 184 """ 185 186 name: str 187 type: Type[Union[str, int, float]] 188 required_for: List[DataSourceType] = [] 189 not_allowed_for: List[DataSourceType] = []
Defines a property that can be associated with a data source.
Includes validation rules for when properties are required or not allowed based on the data source type.
164class DataSourceType(str, Enum): 165 """ 166 The source type of a piece of data. 167 168 Human: a human created the data 169 Synthetic: a model created the data 170 """ 171 172 human = "human" 173 synthetic = "synthetic" 174 file_import = "file_import" 175 tool_call = "tool_call"
The source type of a piece of data.
Human: a human created the data Synthetic: a model created the data
67class DatasetSplit(KilnParentedModel): 68 """ 69 A collection of task runs, with optional splits (train, test, validation). 70 71 Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks. 72 73 Maintains a list of IDs for each split, to avoid data duplication. 74 """ 75 76 name: FilenameString = Field(description="The name of the dataset split.") 77 description: str | None = Field( 78 default=None, 79 description="A description of the dataset for you and your team. Not used in training.", 80 ) 81 splits: list[DatasetSplitDefinition] = Field( 82 default_factory=list, 83 description="The splits in the dataset.", 84 ) 85 split_contents: dict[str, list[str]] = Field( 86 description="The contents of each split in the dataset. The key is the split name, and the value is a list of task run IDs.", 87 ) 88 filter: DatasetFilterId | None = Field( 89 default=None, 90 description="The filter used to build the dataset.", 91 ) 92 93 @model_validator(mode="after") 94 def validate_split_percentages(self) -> "DatasetSplit": 95 total = sum(split.percentage for split in self.splits) 96 if not math.isclose(total, 1.0, rel_tol=1e-9): 97 raise ValueError(f"The sum of split percentages must be 1.0 (got {total})") 98 return self 99 100 @classmethod 101 def from_task( 102 cls, 103 name: str, 104 task: "Task", 105 splits: list[DatasetSplitDefinition], 106 filter_id: DatasetFilterId = "all", 107 description: str | None = None, 108 ): 109 """ 110 Build a dataset split from a task. 111 """ 112 filter = dataset_filter_from_id(filter_id) 113 split_contents = cls.build_split_contents(task, splits, filter) 114 return cls( 115 parent=task, 116 name=name, 117 description=description, 118 splits=splits, 119 split_contents=split_contents, 120 filter=filter_id, 121 ) 122 123 @classmethod 124 def build_split_contents( 125 cls, 126 task: "Task", 127 splits: list[DatasetSplitDefinition], 128 filter: DatasetFilter, 129 ) -> dict[str, list[str]]: 130 valid_ids = [] 131 for task_run in task.runs(): 132 if filter(task_run): 133 valid_ids.append(task_run.id) 134 135 # Shuffle and split by split percentage 136 random.shuffle(valid_ids) 137 split_contents = {} 138 start_idx = 0 139 remaining_items = len(valid_ids) 140 141 # Handle all splits except the last one 142 for split in splits[:-1]: 143 split_size = round(len(valid_ids) * split.percentage) 144 split_contents[split.name] = valid_ids[start_idx : start_idx + split_size] 145 start_idx += split_size 146 remaining_items -= split_size 147 148 # Last split gets all remaining items (for rounding) 149 if splits: 150 split_contents[splits[-1].name] = valid_ids[start_idx:] 151 152 return split_contents 153 154 def parent_task(self) -> "Task | None": 155 # inline import to avoid circular import 156 from kiln_ai.datamodel import Task 157 158 if not isinstance(self.parent, Task): 159 return None 160 return self.parent 161 162 def missing_count(self) -> int: 163 """ 164 Returns: 165 int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset 166 """ 167 parent = self.parent_task() 168 if parent is None: 169 raise ValueError("DatasetSplit has no parent task") 170 171 runs = parent.runs(readonly=True) 172 all_ids = set(run.id for run in runs) 173 all_ids_in_splits = set() 174 for ids in self.split_contents.values(): 175 all_ids_in_splits.update(ids) 176 missing = all_ids_in_splits - all_ids 177 return len(missing)
A collection of task runs, with optional splits (train, test, validation).
Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.
Maintains a list of IDs for each split, to avoid data duplication.
100 @classmethod 101 def from_task( 102 cls, 103 name: str, 104 task: "Task", 105 splits: list[DatasetSplitDefinition], 106 filter_id: DatasetFilterId = "all", 107 description: str | None = None, 108 ): 109 """ 110 Build a dataset split from a task. 111 """ 112 filter = dataset_filter_from_id(filter_id) 113 split_contents = cls.build_split_contents(task, splits, filter) 114 return cls( 115 parent=task, 116 name=name, 117 description=description, 118 splits=splits, 119 split_contents=split_contents, 120 filter=filter_id, 121 )
Build a dataset split from a task.
123 @classmethod 124 def build_split_contents( 125 cls, 126 task: "Task", 127 splits: list[DatasetSplitDefinition], 128 filter: DatasetFilter, 129 ) -> dict[str, list[str]]: 130 valid_ids = [] 131 for task_run in task.runs(): 132 if filter(task_run): 133 valid_ids.append(task_run.id) 134 135 # Shuffle and split by split percentage 136 random.shuffle(valid_ids) 137 split_contents = {} 138 start_idx = 0 139 remaining_items = len(valid_ids) 140 141 # Handle all splits except the last one 142 for split in splits[:-1]: 143 split_size = round(len(valid_ids) * split.percentage) 144 split_contents[split.name] = valid_ids[start_idx : start_idx + split_size] 145 start_idx += split_size 146 remaining_items -= split_size 147 148 # Last split gets all remaining items (for rounding) 149 if splits: 150 split_contents[splits[-1].name] = valid_ids[start_idx:] 151 152 return split_contents
162 def missing_count(self) -> int: 163 """ 164 Returns: 165 int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset 166 """ 167 parent = self.parent_task() 168 if parent is None: 169 raise ValueError("DatasetSplit has no parent task") 170 171 runs = parent.runs(readonly=True) 172 all_ids = set(run.id for run in runs) 173 all_ids_in_splits = set() 174 for ids in self.split_contents.values(): 175 all_ids_in_splits.update(ids) 176 missing = all_ids_in_splits - all_ids 177 return len(missing)
Returns: int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
23class DatasetSplitDefinition(BaseModel): 24 """ 25 A definition of a split in a dataset. 26 27 Example: name="train", description="The training set", percentage=0.8 (80% of the dataset) 28 """ 29 30 name: FilenameString = Field( 31 description="The name of the dataset split definition." 32 ) 33 description: str | None = Field( 34 default=None, 35 description="A description of the dataset for you and your team. Not used in training.", 36 ) 37 percentage: float = Field( 38 ge=0.0, 39 le=1.0, 40 description="The percentage of the dataset that this split represents (between 0 and 1).", 41 )
A definition of a split in a dataset.
Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)
50class ExternalToolServer(KilnParentedModel): 51 """ 52 Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local. 53 54 This model stores the necessary configuration to connect to and authenticate with 55 external MCP servers that provide tools for LLM interactions. 56 """ 57 58 name: FilenameString = Field(description="The name of the external tool.") 59 type: ToolServerType = Field( 60 description="The type of external tool server. Remote tools are hosted on a remote server", 61 ) 62 description: str | None = Field( 63 default=None, 64 description="A description of the external tool for you and your team. Will not be used in prompts/training/validation.", 65 ) 66 67 properties: ( 68 LocalServerProperties | RemoteServerProperties | KilnTaskServerProperties 69 ) = Field( 70 description="Configuration properties specific to the tool type.", 71 ) 72 73 # Private variable to store unsaved secrets 74 _unsaved_secrets: dict[str, str] = PrivateAttr(default_factory=dict) 75 76 def model_post_init(self, __context: Any) -> None: 77 # Process secrets after initialization (pydantic v2 hook) 78 self._process_secrets_from_properties() 79 80 def _process_secrets_from_properties(self) -> None: 81 """ 82 Extract secrets from properties and move them to _unsaved_secrets. 83 This removes secrets from the properties dict so they aren't saved to file. 84 Clears existing _unsaved_secrets first to handle property updates correctly. 85 """ 86 # Clear existing unsaved secrets since we're reprocessing 87 self._unsaved_secrets.clear() 88 89 secret_keys = self.get_secret_keys() 90 91 if not secret_keys: 92 return 93 94 # Extract secret values from properties based on server type 95 match self.type: 96 case ToolServerType.remote_mcp: 97 headers = self.properties.get("headers", {}) 98 for key_name in secret_keys: 99 if key_name in headers: 100 self._unsaved_secrets[key_name] = headers[key_name] 101 # Remove from headers immediately so they are not saved to file 102 del headers[key_name] 103 104 case ToolServerType.local_mcp: 105 env_vars = self.properties.get("env_vars", {}) 106 for key_name in secret_keys: 107 if key_name in env_vars: 108 self._unsaved_secrets[key_name] = env_vars[key_name] 109 # Remove from env_vars immediately so they are not saved to file 110 del env_vars[key_name] 111 112 case ToolServerType.kiln_task: 113 pass 114 115 case _: 116 raise_exhaustive_enum_error(self.type) 117 118 def __setattr__(self, name: str, value: Any) -> None: 119 """ 120 Override __setattr__ to process secrets whenever properties are updated. 121 """ 122 super().__setattr__(name, value) 123 124 # Process secrets whenever properties are updated 125 if name == "properties": 126 self._process_secrets_from_properties() 127 128 # Validation Helpers 129 130 @classmethod 131 def check_server_url(cls, server_url: str) -> None: 132 """Validate Server URL""" 133 if not isinstance(server_url, str): 134 raise ValueError("Server URL must be a string") 135 136 # Check for leading whitespace in URL 137 if server_url != server_url.lstrip(): 138 raise ValueError("Server URL must not have leading whitespace") 139 140 parsed_url = urlparse(server_url) 141 if not parsed_url.netloc: 142 raise ValueError("Server URL is not a valid URL") 143 if parsed_url.scheme not in ["http", "https"]: 144 raise ValueError("Server URL must start with http:// or https://") 145 146 @classmethod 147 def check_headers(cls, headers: dict) -> None: 148 """Validate Headers""" 149 if not isinstance(headers, dict): 150 raise ValueError("headers must be a dictionary") 151 152 for key, value in headers.items(): 153 if not key: 154 raise ValueError("Header name is required") 155 if not value: 156 raise ValueError("Header value is required") 157 158 # Reject invalid header names and CR/LF in names/values 159 token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$") 160 if not token_re.match(key): 161 raise ValueError(f'Invalid header name: "{key}"') 162 if re.search(r"\r|\n", key) or re.search(r"\r|\n", value): 163 raise ValueError( 164 "Header names/values must not contain invalid characters" 165 ) 166 167 @classmethod 168 def check_secret_keys( 169 cls, secret_keys: list, key_type: str, tool_type: str 170 ) -> None: 171 """Validate Secret Keys (generic method for both header and env var keys)""" 172 if not isinstance(secret_keys, list): 173 raise ValueError( 174 f"{key_type} must be a list for external tools of type '{tool_type}'" 175 ) 176 if not all(isinstance(k, str) for k in secret_keys): 177 raise ValueError(f"{key_type} must contain only strings") 178 if not all(key for key in secret_keys): 179 raise ValueError("Secret key is required") 180 181 @classmethod 182 def check_env_vars(cls, env_vars: dict) -> None: 183 """Validate Environment Variables""" 184 if not isinstance(env_vars, dict): 185 raise ValueError("environment variables must be a dictionary") 186 187 # Validate env_vars keys are in the correct format for Environment Variables 188 # According to POSIX specification, environment variable names must: 189 # - Start with a letter (a-z, A-Z) or underscore (_) 190 # - Contain only ASCII letters, digits, and underscores 191 for key, _ in env_vars.items(): 192 if not key or not ( 193 key[0].isascii() and (key[0].isalpha() or key[0] == "_") 194 ): 195 raise ValueError( 196 f"Invalid environment variable key: {key}. Must start with a letter or underscore." 197 ) 198 199 if not all(c.isascii() and (c.isalnum() or c == "_") for c in key): 200 raise ValueError( 201 f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores." 202 ) 203 204 @classmethod 205 def type_from_data(cls, data: dict) -> ToolServerType: 206 """Get the tool server type from the data for the the validators""" 207 raw_type = data.get("type") 208 if raw_type is None: 209 raise ValueError("type is required") 210 try: 211 return ToolServerType(raw_type) 212 except ValueError: 213 valid_types = ", ".join(type.value for type in ToolServerType) 214 raise ValueError(f"type must be one of: {valid_types}") 215 216 @model_validator(mode="before") 217 def validate_required_fields(cls, data: dict) -> dict: 218 """Validate that each tool type has the required configuration.""" 219 server_type = ExternalToolServer.type_from_data(data) 220 properties = data.get("properties", {}) 221 222 match server_type: 223 case ToolServerType.remote_mcp: 224 server_url = properties.get("server_url", None) 225 if server_url is None: 226 raise ValueError( 227 "Server URL is required to connect to a remote MCP server" 228 ) 229 ExternalToolServer.check_server_url(server_url) 230 231 case ToolServerType.local_mcp: 232 command = properties.get("command", None) 233 if command is None: 234 raise ValueError("command is required to start a local MCP server") 235 if not isinstance(command, str): 236 raise ValueError( 237 "command must be a string to start a local MCP server" 238 ) 239 # Reject empty/whitespace-only command strings 240 if command.strip() == "": 241 raise ValueError("command must be a non-empty string") 242 243 args = properties.get("args", None) 244 if args is not None: 245 if not isinstance(args, list): 246 raise ValueError( 247 "arguments must be a list to start a local MCP server" 248 ) 249 250 case ToolServerType.kiln_task: 251 tool_name_validator(properties.get("name", "")) 252 err_msg_prefix = "Kiln task server properties:" 253 validate_return_dict_prop( 254 properties, "description", str, err_msg_prefix 255 ) 256 description = properties.get("description", "") 257 if len(description) > 128: 258 raise ValueError("description must be 128 characters or less") 259 validate_return_dict_prop( 260 properties, "is_archived", bool, err_msg_prefix 261 ) 262 validate_return_dict_prop(properties, "task_id", str, err_msg_prefix) 263 validate_return_dict_prop( 264 properties, "run_config_id", str, err_msg_prefix 265 ) 266 267 case _: 268 # Type checking will catch missing cases 269 raise_exhaustive_enum_error(server_type) 270 return data 271 272 @model_validator(mode="before") 273 def validate_headers_and_env_vars(cls, data: dict) -> dict: 274 """ 275 Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped 276 """ 277 type = ExternalToolServer.type_from_data(data) 278 279 properties = data.get("properties", {}) 280 if properties is None: 281 raise ValueError("properties is required") 282 283 match type: 284 case ToolServerType.remote_mcp: 285 # Validate headers 286 headers = properties.get("headers", None) 287 if headers is not None: 288 ExternalToolServer.check_headers(headers) 289 290 # Secret header keys are optional, validate if they are set 291 secret_header_keys = properties.get("secret_header_keys", None) 292 if secret_header_keys is not None: 293 ExternalToolServer.check_secret_keys( 294 secret_header_keys, "secret_header_keys", "remote_mcp" 295 ) 296 297 case ToolServerType.local_mcp: 298 # Validate secret environment variable keys 299 env_vars = properties.get("env_vars", {}) 300 if env_vars is not None: 301 ExternalToolServer.check_env_vars(env_vars) 302 303 # Secret env var keys are optional, but if they are set, they must be a list of strings 304 secret_env_var_keys = properties.get("secret_env_var_keys", None) 305 if secret_env_var_keys is not None: 306 ExternalToolServer.check_secret_keys( 307 secret_env_var_keys, "secret_env_var_keys", "local_mcp" 308 ) 309 310 case ToolServerType.kiln_task: 311 pass 312 313 case _: 314 raise_exhaustive_enum_error(type) 315 316 return data 317 318 def get_secret_keys(self) -> list[str]: 319 """ 320 Get the list of secret key names based on server type. 321 322 Returns: 323 List of secret key names (header names for remote, env var names for local) 324 """ 325 match self.type: 326 case ToolServerType.remote_mcp: 327 return self.properties.get("secret_header_keys", []) 328 case ToolServerType.local_mcp: 329 return self.properties.get("secret_env_var_keys", []) 330 case ToolServerType.kiln_task: 331 return [] 332 case _: 333 raise_exhaustive_enum_error(self.type) 334 335 def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]: 336 """ 337 Retrieve secrets from configuration system or in-memory storage. 338 Automatically determines which secret keys to retrieve based on the server type. 339 Config secrets take precedence over unsaved secrets. 340 341 Returns: 342 Tuple of (secrets_dict, missing_secrets_list) where: 343 - secrets_dict: Dictionary mapping key names to their secret values 344 - missing_secrets_list: List of secret key names that are missing values 345 """ 346 secrets = {} 347 missing_secrets = [] 348 secret_keys = self.get_secret_keys() 349 350 if secret_keys and len(secret_keys) > 0: 351 config = Config.shared() 352 mcp_secrets = config.get_value(MCP_SECRETS_KEY) 353 354 for key_name in secret_keys: 355 secret_value = None 356 357 # First check config secrets (persistent storage), key is mcp_server_id::key_name 358 secret_key = self._config_secret_key(key_name) 359 secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None 360 361 # Fall back to unsaved secrets (in-memory storage) 362 if ( 363 not secret_value 364 and hasattr(self, "_unsaved_secrets") 365 and key_name in self._unsaved_secrets 366 ): 367 secret_value = self._unsaved_secrets[key_name] 368 369 if secret_value: 370 secrets[key_name] = secret_value 371 else: 372 missing_secrets.append(key_name) 373 374 return secrets, missing_secrets 375 376 def _save_secrets(self) -> None: 377 """ 378 Save unsaved secrets to the configuration system. 379 """ 380 secret_keys = self.get_secret_keys() 381 382 # No secrets to save 383 if not secret_keys: 384 return 385 386 if self.id is None: 387 raise ValueError("Server ID cannot be None when saving secrets") 388 389 # Check if secrets are already saved 390 if not hasattr(self, "_unsaved_secrets") or not self._unsaved_secrets: 391 return 392 393 config = Config.shared() 394 mcp_secrets: dict[str, str] = config.get_value(MCP_SECRETS_KEY) or {} 395 396 # Store secrets with the pattern: mcp_server_id::key_name 397 for key_name, secret_value in self._unsaved_secrets.items(): 398 secret_key = self._config_secret_key(key_name) 399 mcp_secrets[secret_key] = secret_value 400 401 config.update_settings({MCP_SECRETS_KEY: mcp_secrets}) 402 403 # Clear unsaved secrets after saving 404 self._unsaved_secrets.clear() 405 406 def delete_secrets(self) -> None: 407 """ 408 Delete all secrets for this tool server from the configuration system. 409 """ 410 secret_keys = self.get_secret_keys() 411 412 config = Config.shared() 413 mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]() 414 415 # Remove secrets with the pattern: mcp_server_id::key_name 416 for key_name in secret_keys: 417 secret_key = self._config_secret_key(key_name) 418 if secret_key in mcp_secrets: 419 del mcp_secrets[secret_key] 420 421 # Always call update_settings to maintain consistency with the old behavior 422 config.update_settings({MCP_SECRETS_KEY: mcp_secrets}) 423 424 def save_to_file(self) -> None: 425 """ 426 Override save_to_file to automatically save any unsaved secrets before saving to file. 427 428 This ensures that secrets are always saved when the object is saved, 429 preventing the issue where secrets could be lost if save_to_file is called 430 without explicitly saving secrets first. 431 """ 432 # Save any unsaved secrets first 433 if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets: 434 self._save_secrets() 435 436 # Call the parent save_to_file method 437 super().save_to_file() 438 439 # Internal helpers 440 441 def _config_secret_key(self, key_name: str) -> str: 442 """ 443 Generate the secret key pattern for storing/retrieving secrets. 444 445 Args: 446 key_name: The name of the secret key 447 448 Returns: 449 The formatted secret key: "{server_id}::{key_name}" 450 """ 451 return f"{self.id}::{key_name}"
Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.
This model stores the necessary configuration to connect to and authenticate with external MCP servers that provide tools for LLM interactions.
76 def model_post_init(self, __context: Any) -> None: 77 # Process secrets after initialization (pydantic v2 hook) 78 self._process_secrets_from_properties()
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
130 @classmethod 131 def check_server_url(cls, server_url: str) -> None: 132 """Validate Server URL""" 133 if not isinstance(server_url, str): 134 raise ValueError("Server URL must be a string") 135 136 # Check for leading whitespace in URL 137 if server_url != server_url.lstrip(): 138 raise ValueError("Server URL must not have leading whitespace") 139 140 parsed_url = urlparse(server_url) 141 if not parsed_url.netloc: 142 raise ValueError("Server URL is not a valid URL") 143 if parsed_url.scheme not in ["http", "https"]: 144 raise ValueError("Server URL must start with http:// or https://")
Validate Server URL
146 @classmethod 147 def check_headers(cls, headers: dict) -> None: 148 """Validate Headers""" 149 if not isinstance(headers, dict): 150 raise ValueError("headers must be a dictionary") 151 152 for key, value in headers.items(): 153 if not key: 154 raise ValueError("Header name is required") 155 if not value: 156 raise ValueError("Header value is required") 157 158 # Reject invalid header names and CR/LF in names/values 159 token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$") 160 if not token_re.match(key): 161 raise ValueError(f'Invalid header name: "{key}"') 162 if re.search(r"\r|\n", key) or re.search(r"\r|\n", value): 163 raise ValueError( 164 "Header names/values must not contain invalid characters" 165 )
Validate Headers
167 @classmethod 168 def check_secret_keys( 169 cls, secret_keys: list, key_type: str, tool_type: str 170 ) -> None: 171 """Validate Secret Keys (generic method for both header and env var keys)""" 172 if not isinstance(secret_keys, list): 173 raise ValueError( 174 f"{key_type} must be a list for external tools of type '{tool_type}'" 175 ) 176 if not all(isinstance(k, str) for k in secret_keys): 177 raise ValueError(f"{key_type} must contain only strings") 178 if not all(key for key in secret_keys): 179 raise ValueError("Secret key is required")
Validate Secret Keys (generic method for both header and env var keys)
181 @classmethod 182 def check_env_vars(cls, env_vars: dict) -> None: 183 """Validate Environment Variables""" 184 if not isinstance(env_vars, dict): 185 raise ValueError("environment variables must be a dictionary") 186 187 # Validate env_vars keys are in the correct format for Environment Variables 188 # According to POSIX specification, environment variable names must: 189 # - Start with a letter (a-z, A-Z) or underscore (_) 190 # - Contain only ASCII letters, digits, and underscores 191 for key, _ in env_vars.items(): 192 if not key or not ( 193 key[0].isascii() and (key[0].isalpha() or key[0] == "_") 194 ): 195 raise ValueError( 196 f"Invalid environment variable key: {key}. Must start with a letter or underscore." 197 ) 198 199 if not all(c.isascii() and (c.isalnum() or c == "_") for c in key): 200 raise ValueError( 201 f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores." 202 )
Validate Environment Variables
204 @classmethod 205 def type_from_data(cls, data: dict) -> ToolServerType: 206 """Get the tool server type from the data for the the validators""" 207 raw_type = data.get("type") 208 if raw_type is None: 209 raise ValueError("type is required") 210 try: 211 return ToolServerType(raw_type) 212 except ValueError: 213 valid_types = ", ".join(type.value for type in ToolServerType) 214 raise ValueError(f"type must be one of: {valid_types}")
Get the tool server type from the data for the the validators
216 @model_validator(mode="before") 217 def validate_required_fields(cls, data: dict) -> dict: 218 """Validate that each tool type has the required configuration.""" 219 server_type = ExternalToolServer.type_from_data(data) 220 properties = data.get("properties", {}) 221 222 match server_type: 223 case ToolServerType.remote_mcp: 224 server_url = properties.get("server_url", None) 225 if server_url is None: 226 raise ValueError( 227 "Server URL is required to connect to a remote MCP server" 228 ) 229 ExternalToolServer.check_server_url(server_url) 230 231 case ToolServerType.local_mcp: 232 command = properties.get("command", None) 233 if command is None: 234 raise ValueError("command is required to start a local MCP server") 235 if not isinstance(command, str): 236 raise ValueError( 237 "command must be a string to start a local MCP server" 238 ) 239 # Reject empty/whitespace-only command strings 240 if command.strip() == "": 241 raise ValueError("command must be a non-empty string") 242 243 args = properties.get("args", None) 244 if args is not None: 245 if not isinstance(args, list): 246 raise ValueError( 247 "arguments must be a list to start a local MCP server" 248 ) 249 250 case ToolServerType.kiln_task: 251 tool_name_validator(properties.get("name", "")) 252 err_msg_prefix = "Kiln task server properties:" 253 validate_return_dict_prop( 254 properties, "description", str, err_msg_prefix 255 ) 256 description = properties.get("description", "") 257 if len(description) > 128: 258 raise ValueError("description must be 128 characters or less") 259 validate_return_dict_prop( 260 properties, "is_archived", bool, err_msg_prefix 261 ) 262 validate_return_dict_prop(properties, "task_id", str, err_msg_prefix) 263 validate_return_dict_prop( 264 properties, "run_config_id", str, err_msg_prefix 265 ) 266 267 case _: 268 # Type checking will catch missing cases 269 raise_exhaustive_enum_error(server_type) 270 return data
Validate that each tool type has the required configuration.
272 @model_validator(mode="before") 273 def validate_headers_and_env_vars(cls, data: dict) -> dict: 274 """ 275 Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped 276 """ 277 type = ExternalToolServer.type_from_data(data) 278 279 properties = data.get("properties", {}) 280 if properties is None: 281 raise ValueError("properties is required") 282 283 match type: 284 case ToolServerType.remote_mcp: 285 # Validate headers 286 headers = properties.get("headers", None) 287 if headers is not None: 288 ExternalToolServer.check_headers(headers) 289 290 # Secret header keys are optional, validate if they are set 291 secret_header_keys = properties.get("secret_header_keys", None) 292 if secret_header_keys is not None: 293 ExternalToolServer.check_secret_keys( 294 secret_header_keys, "secret_header_keys", "remote_mcp" 295 ) 296 297 case ToolServerType.local_mcp: 298 # Validate secret environment variable keys 299 env_vars = properties.get("env_vars", {}) 300 if env_vars is not None: 301 ExternalToolServer.check_env_vars(env_vars) 302 303 # Secret env var keys are optional, but if they are set, they must be a list of strings 304 secret_env_var_keys = properties.get("secret_env_var_keys", None) 305 if secret_env_var_keys is not None: 306 ExternalToolServer.check_secret_keys( 307 secret_env_var_keys, "secret_env_var_keys", "local_mcp" 308 ) 309 310 case ToolServerType.kiln_task: 311 pass 312 313 case _: 314 raise_exhaustive_enum_error(type) 315 316 return data
Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
318 def get_secret_keys(self) -> list[str]: 319 """ 320 Get the list of secret key names based on server type. 321 322 Returns: 323 List of secret key names (header names for remote, env var names for local) 324 """ 325 match self.type: 326 case ToolServerType.remote_mcp: 327 return self.properties.get("secret_header_keys", []) 328 case ToolServerType.local_mcp: 329 return self.properties.get("secret_env_var_keys", []) 330 case ToolServerType.kiln_task: 331 return [] 332 case _: 333 raise_exhaustive_enum_error(self.type)
Get the list of secret key names based on server type.
Returns: List of secret key names (header names for remote, env var names for local)
335 def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]: 336 """ 337 Retrieve secrets from configuration system or in-memory storage. 338 Automatically determines which secret keys to retrieve based on the server type. 339 Config secrets take precedence over unsaved secrets. 340 341 Returns: 342 Tuple of (secrets_dict, missing_secrets_list) where: 343 - secrets_dict: Dictionary mapping key names to their secret values 344 - missing_secrets_list: List of secret key names that are missing values 345 """ 346 secrets = {} 347 missing_secrets = [] 348 secret_keys = self.get_secret_keys() 349 350 if secret_keys and len(secret_keys) > 0: 351 config = Config.shared() 352 mcp_secrets = config.get_value(MCP_SECRETS_KEY) 353 354 for key_name in secret_keys: 355 secret_value = None 356 357 # First check config secrets (persistent storage), key is mcp_server_id::key_name 358 secret_key = self._config_secret_key(key_name) 359 secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None 360 361 # Fall back to unsaved secrets (in-memory storage) 362 if ( 363 not secret_value 364 and hasattr(self, "_unsaved_secrets") 365 and key_name in self._unsaved_secrets 366 ): 367 secret_value = self._unsaved_secrets[key_name] 368 369 if secret_value: 370 secrets[key_name] = secret_value 371 else: 372 missing_secrets.append(key_name) 373 374 return secrets, missing_secrets
Retrieve secrets from configuration system or in-memory storage. Automatically determines which secret keys to retrieve based on the server type. Config secrets take precedence over unsaved secrets.
Returns: Tuple of (secrets_dict, missing_secrets_list) where: - secrets_dict: Dictionary mapping key names to their secret values - missing_secrets_list: List of secret key names that are missing values
406 def delete_secrets(self) -> None: 407 """ 408 Delete all secrets for this tool server from the configuration system. 409 """ 410 secret_keys = self.get_secret_keys() 411 412 config = Config.shared() 413 mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]() 414 415 # Remove secrets with the pattern: mcp_server_id::key_name 416 for key_name in secret_keys: 417 secret_key = self._config_secret_key(key_name) 418 if secret_key in mcp_secrets: 419 del mcp_secrets[secret_key] 420 421 # Always call update_settings to maintain consistency with the old behavior 422 config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
Delete all secrets for this tool server from the configuration system.
424 def save_to_file(self) -> None: 425 """ 426 Override save_to_file to automatically save any unsaved secrets before saving to file. 427 428 This ensures that secrets are always saved when the object is saved, 429 preventing the issue where secrets could be lost if save_to_file is called 430 without explicitly saving secrets first. 431 """ 432 # Save any unsaved secrets first 433 if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets: 434 self._save_secrets() 435 436 # Call the parent save_to_file method 437 super().save_to_file()
Override save_to_file to automatically save any unsaved secrets before saving to file.
This ensures that secrets are always saved when the object is saved, preventing the issue where secrets could be lost if save_to_file is called without explicitly saving secrets first.
49class FineTuneStatusType(str, Enum): 50 """ 51 The status type of a fine-tune (running, completed, failed, etc). 52 """ 53 54 unknown = "unknown" # server error 55 pending = "pending" 56 running = "running" 57 completed = "completed" 58 failed = "failed"
The status type of a fine-tune (running, completed, failed, etc).
23class Finetune(KilnParentedModel): 24 """ 25 The Kiln fine-tune datamodel. 26 27 Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID. 28 """ 29 30 name: FilenameString = Field(description="The name of the fine-tune.") 31 description: str | None = Field( 32 default=None, 33 description="A description of the fine-tune for you and your team. Not used in training.", 34 ) 35 structured_output_mode: StructuredOutputMode | None = Field( 36 default=None, 37 description="The mode to use to train the model for structured output, if it was trained with structured output. Will determine how we call the tuned model, so we call with the matching mode.", 38 ) 39 provider: str = Field( 40 description="The provider to use for the fine-tune (e.g. 'openai')." 41 ) 42 base_model_id: str = Field( 43 description="The id of the base model to use for the fine-tune. This string relates to the provider's IDs for their own models, not Kiln IDs." 44 ) 45 provider_id: str | None = Field( 46 default=None, 47 description="The ID of the fine-tune job on the provider's side. May not be the same as the fine_tune_model_id.", 48 ) 49 fine_tune_model_id: str | None = Field( 50 default=None, 51 description="The ID of the fine-tuned model on the provider's side. May not be the same as the provider_id.", 52 ) 53 dataset_split_id: str = Field( 54 description="The ID of the dataset split to use for this fine-tune.", 55 ) 56 train_split_name: str = Field( 57 default="train", 58 description="The name of the training split to use for this fine-tune.", 59 ) 60 validation_split_name: str | None = Field( 61 default=None, 62 description="The name of the validation split to use for this fine-tune. Optional.", 63 ) 64 parameters: dict[str, str | int | float | bool] = Field( 65 default={}, 66 description="The parameters to use for this fine-tune. These are provider-specific.", 67 ) 68 # These two fields are saved exactly used for training. Even if they map exactly to a custom prompt or generator, those can change, so we want to keep a record of the training prompt. 69 system_message: str = Field( 70 description="The system message to use for this fine-tune.", 71 ) 72 thinking_instructions: str | None = Field( 73 default=None, 74 description="The thinking instructions to use for this fine-tune. Only used when data_strategy is final_and_intermediate.", 75 ) 76 latest_status: FineTuneStatusType = Field( 77 default=FineTuneStatusType.unknown, 78 description="The latest known status of this fine-tune. Not updated in real time.", 79 ) 80 properties: Dict[str, str | int | float] = Field( 81 default={}, 82 description="Properties of the fine-tune. Different providers may use different properties.", 83 ) 84 data_strategy: ChatStrategy = Field( 85 default=ChatStrategy.single_turn, 86 description="The strategy to use for training the model. 'final_only' will only train on the final response. 'final_and_intermediate' will train on the final response and intermediate outputs (chain of thought or reasoning).", 87 ) 88 89 # Workaround to return typed parent without importing Task 90 def parent_task(self) -> Union["Task", None]: 91 if self.parent is None or self.parent.__class__.__name__ != "Task": 92 return None 93 return self.parent # type: ignore 94 95 @model_validator(mode="after") 96 def validate_thinking_instructions(self) -> Self: 97 if ( 98 self.thinking_instructions is not None 99 and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 100 ): 101 raise ValueError( 102 f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 103 ) 104 if ( 105 self.thinking_instructions is None 106 and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 107 ): 108 raise ValueError( 109 f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 110 ) 111 return self
The Kiln fine-tune datamodel.
Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.
95 @model_validator(mode="after") 96 def validate_thinking_instructions(self) -> Self: 97 if ( 98 self.thinking_instructions is not None 99 and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 100 ): 101 raise ValueError( 102 f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 103 ) 104 if ( 105 self.thinking_instructions is None 106 and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 107 ): 108 raise ValueError( 109 f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 110 ) 111 return self
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
5class Priority(IntEnum): 6 """Defines priority levels for tasks and requirements, where P0 is highest priority.""" 7 8 p0 = 0 9 p1 = 1 10 p2 = 2 11 p3 = 3
Defines priority levels for tasks and requirements, where P0 is highest priority.
14class Project( 15 KilnParentModel, 16 parent_of={ 17 "tasks": Task, 18 "documents": Document, 19 "extractor_configs": ExtractorConfig, 20 "chunker_configs": ChunkerConfig, 21 "embedding_configs": EmbeddingConfig, 22 "rag_configs": RagConfig, 23 "vector_store_configs": VectorStoreConfig, 24 "external_tool_servers": ExternalToolServer, 25 }, 26): 27 """ 28 A collection of related tasks. 29 30 Projects organize tasks into logical groups and provide high-level descriptions 31 of the overall goals. 32 """ 33 34 name: FilenameString = Field(description="The name of the project.") 35 description: str | None = Field( 36 default=None, 37 description="A description of the project for you and your team. Will not be used in prompts/training/validation.", 38 ) 39 40 # Needed for typechecking. We should fix this in KilnParentModel 41 def tasks(self) -> list[Task]: 42 return super().tasks() # type: ignore 43 44 def documents(self, readonly: bool = False) -> list[Document]: 45 return super().documents(readonly=readonly) # type: ignore 46 47 def extractor_configs(self, readonly: bool = False) -> list[ExtractorConfig]: 48 return super().extractor_configs(readonly=readonly) # type: ignore 49 50 def chunker_configs(self, readonly: bool = False) -> list[ChunkerConfig]: 51 return super().chunker_configs(readonly=readonly) # type: ignore 52 53 def embedding_configs(self, readonly: bool = False) -> list[EmbeddingConfig]: 54 return super().embedding_configs(readonly=readonly) # type: ignore 55 56 def vector_store_configs(self, readonly: bool = False) -> list[VectorStoreConfig]: 57 return super().vector_store_configs(readonly=readonly) # type: ignore 58 59 def rag_configs(self, readonly: bool = False) -> list[RagConfig]: 60 return super().rag_configs(readonly=readonly) # type: ignore 61 62 def external_tool_servers(self, readonly: bool = False) -> list[ExternalToolServer]: 63 return super().external_tool_servers(readonly=readonly) # type: ignore
A collection of related tasks.
Projects organize tasks into logical groups and provide high-level descriptions of the overall goals.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
33class Prompt(KilnParentedModel, BasePrompt): 34 """ 35 A prompt for a task. This is the custom prompt parented by a task. 36 """ 37 38 pass
A prompt for a task. This is the custom prompt parented by a task.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
9class PromptGenerators(str, Enum): 10 SIMPLE = "simple_prompt_builder" 11 MULTI_SHOT = "multi_shot_prompt_builder" 12 FEW_SHOT = "few_shot_prompt_builder" 13 REPAIRS = "repairs_prompt_builder" 14 SIMPLE_CHAIN_OF_THOUGHT = "simple_chain_of_thought_prompt_builder" 15 FEW_SHOT_CHAIN_OF_THOUGHT = "few_shot_chain_of_thought_prompt_builder" 16 MULTI_SHOT_CHAIN_OF_THOUGHT = "multi_shot_chain_of_thought_prompt_builder" 17 SHORT = "short_prompt_builder"
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
20class RequirementRating(BaseModel): 21 """Rating for a specific requirement within a task output.""" 22 23 value: float = Field( 24 description="The rating value. Interpretation depends on rating type" 25 ) 26 type: TaskOutputRatingType = Field(description="The type of rating")
Rating for a specific requirement within a task output.
24class StructuredOutputMode(str, Enum): 25 """ 26 Enumeration of supported structured output modes. 27 28 - json_schema: request json using API capabilities for json_schema 29 - function_calling: request json using API capabilities for function calling 30 - json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema 31 - json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings. 32 - json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries). 33 - json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions. 34 - default: let the adapter decide (legacy, do not use for new use cases) 35 - unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime. 36 """ 37 38 default = "default" 39 json_schema = "json_schema" 40 function_calling_weak = "function_calling_weak" 41 function_calling = "function_calling" 42 json_mode = "json_mode" 43 json_instructions = "json_instructions" 44 json_instruction_and_object = "json_instruction_and_object" 45 json_custom_instructions = "json_custom_instructions" 46 unknown = "unknown"
Enumeration of supported structured output modes.
- json_schema: request json using API capabilities for json_schema
- function_calling: request json using API capabilities for function calling
- json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
- json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
- json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
- json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
- default: let the adapter decide (legacy, do not use for new use cases)
- unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
99class Task( 100 KilnParentedModel, 101 KilnParentModel, 102 parent_of={ 103 "runs": TaskRun, 104 "dataset_splits": DatasetSplit, 105 "finetunes": Finetune, 106 "prompts": Prompt, 107 "evals": Eval, 108 "run_configs": TaskRunConfig, 109 }, 110): 111 """ 112 Represents a specific task to be performed, with associated requirements and validation rules. 113 114 Contains the task definition, requirements, input/output schemas, and maintains 115 a collection of task runs. 116 """ 117 118 name: FilenameString = Field(description="The name of the task.") 119 description: str | None = Field( 120 default=None, 121 description="A description of the task for you and your team. Will not be used in prompts/training/validation.", 122 ) 123 instruction: str = Field( 124 min_length=1, 125 description="The instructions for the task. Will be used in prompts/training/validation.", 126 ) 127 requirements: List[TaskRequirement] = Field(default=[]) 128 output_json_schema: JsonObjectSchema | None = None 129 input_json_schema: JsonObjectSchema | None = None 130 thinking_instruction: str | None = Field( 131 default=None, 132 description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting.", 133 ) 134 135 default_run_config_id: ID_TYPE | None = Field( 136 default=None, 137 description="ID of the run config to use for this task by default. Must exist in saved run configs for this task.", 138 ) 139 140 def output_schema(self) -> Dict | None: 141 if self.output_json_schema is None: 142 return None 143 return schema_from_json_str(self.output_json_schema) 144 145 def input_schema(self) -> Dict | None: 146 if self.input_json_schema is None: 147 return None 148 return schema_from_json_str(self.input_json_schema) 149 150 # These wrappers help for typechecking. We should fix this in KilnParentModel 151 def runs(self, readonly: bool = False) -> list[TaskRun]: 152 return super().runs(readonly=readonly) # type: ignore 153 154 def dataset_splits(self, readonly: bool = False) -> list[DatasetSplit]: 155 return super().dataset_splits(readonly=readonly) # type: ignore 156 157 def finetunes(self, readonly: bool = False) -> list[Finetune]: 158 return super().finetunes(readonly=readonly) # type: ignore 159 160 def prompts(self, readonly: bool = False) -> list[Prompt]: 161 return super().prompts(readonly=readonly) # type: ignore 162 163 def evals(self, readonly: bool = False) -> list[Eval]: 164 return super().evals(readonly=readonly) # type: ignore 165 166 def run_configs(self, readonly: bool = False) -> list[TaskRunConfig]: 167 return super().run_configs(readonly=readonly) # type: ignore 168 169 # Workaround to return typed parent without importing Task 170 def parent_project(self) -> Union["Project", None]: 171 if self.parent is None or self.parent.__class__.__name__ != "Project": 172 return None 173 return self.parent # type: ignore
Represents a specific task to be performed, with associated requirements and validation rules.
Contains the task definition, requirements, input/output schemas, and maintains a collection of task runs.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
643 def child_method(self, readonly: bool = False) -> list[child_class]: 644 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
322class TaskOutput(KilnBaseModel): 323 """ 324 An output for a specific task run. 325 326 Contains the actual output content, its source (human or synthetic), 327 and optional rating information. 328 """ 329 330 output: str = Field( 331 description="The output of the task. JSON formatted for structured output, plaintext for unstructured output." 332 ) 333 source: DataSource | None = Field( 334 description="The source of the output: human or synthetic.", 335 default=None, 336 ) 337 rating: TaskOutputRating | None = Field( 338 default=None, description="The rating of the output" 339 ) 340 341 def validate_output_format(self, task: "Task") -> Self: 342 # validate output 343 if task.output_json_schema is not None: 344 try: 345 output_parsed = json.loads(self.output) 346 except json.JSONDecodeError: 347 raise ValueError("Output is not a valid JSON object") 348 349 validate_schema_with_value_error( 350 output_parsed, 351 task.output_json_schema, 352 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 353 ) 354 return self 355 356 @model_validator(mode="after") 357 def validate_output_source(self, info: ValidationInfo) -> Self: 358 # On strict mode and not loaded from file, we validate output_source is not None. 359 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 360 if not strict_mode(): 361 return self 362 if self.loaded_from_file(info): 363 return self 364 if self.source is None: 365 raise ValueError("Output source is required when strict mode is enabled") 366 return self
An output for a specific task run.
Contains the actual output content, its source (human or synthetic), and optional rating information.
341 def validate_output_format(self, task: "Task") -> Self: 342 # validate output 343 if task.output_json_schema is not None: 344 try: 345 output_parsed = json.loads(self.output) 346 except json.JSONDecodeError: 347 raise ValueError("Output is not a valid JSON object") 348 349 validate_schema_with_value_error( 350 output_parsed, 351 task.output_json_schema, 352 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 353 ) 354 return self
356 @model_validator(mode="after") 357 def validate_output_source(self, info: ValidationInfo) -> Self: 358 # On strict mode and not loaded from file, we validate output_source is not None. 359 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 360 if not strict_mode(): 361 return self 362 if self.loaded_from_file(info): 363 return self 364 if self.source is None: 365 raise ValueError("Output source is required when strict mode is enabled") 366 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
50class TaskOutputRating(KilnBaseModel): 51 """ 52 A rating for a task output, including an overall rating and ratings for each requirement. 53 54 Supports: 55 - five_star: 1-5 star ratings 56 - pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail) 57 - pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail) 58 """ 59 60 type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star) 61 value: float | None = Field( 62 description="The rating value. Interpretation depends on rating type:\n- five_star: 1-5 stars\n- pass_fail: 1.0 (pass) or 0.0 (fail)\n- pass_fail_critical: 1.0 (pass), 0.0 (fail), or -1.0 (critical fail)", 63 default=None, 64 ) 65 requirement_ratings: Dict[ID_TYPE, RequirementRating] = Field( 66 default={}, 67 description="The ratings of the requirements of the task. The ID can be either a task_requirement_id or a named rating for an eval_output_score name (in format 'named::<name>').", 68 ) 69 70 # Previously we stored rating values as a dict of floats, but now we store them as RequirementRating objects. 71 @model_validator(mode="before") 72 def upgrade_old_format(cls, data: dict) -> dict: 73 if not isinstance(data, dict): 74 return data 75 76 # Check if we have the old format (dict of floats) 77 req_ratings = data.get("requirement_ratings", {}) 78 if req_ratings and all( 79 isinstance(v, (int, float)) for v in req_ratings.values() 80 ): 81 # Convert each float to a RequirementRating object 82 # all ratings are five star at the point we used this format 83 data["requirement_ratings"] = { 84 k: {"value": v, "type": TaskOutputRatingType.five_star} 85 for k, v in req_ratings.items() 86 } 87 88 return data 89 90 # Used to select high quality outputs for example selection (MultiShotPromptBuilder, etc) 91 def is_high_quality(self) -> bool: 92 if self.value is None: 93 return False 94 95 if self.type == TaskOutputRatingType.five_star: 96 return self.value >= 4 97 elif self.type == TaskOutputRatingType.pass_fail: 98 return self.value == 1.0 99 elif self.type == TaskOutputRatingType.pass_fail_critical: 100 return self.value == 1.0 101 return False 102 103 @model_validator(mode="after") 104 def validate_rating(self) -> Self: 105 if self.type not in TaskOutputRatingType: 106 raise ValueError(f"Invalid rating type: {self.type}") 107 108 # Overall rating is optional 109 if self.value is not None: 110 self._validate_rating(self.type, self.value, "overall rating") 111 112 for req_id, req_rating in self.requirement_ratings.items(): 113 self._validate_rating( 114 req_rating.type, 115 req_rating.value, 116 f"requirement rating for req ID: {req_id}", 117 ) 118 119 return self 120 121 def _validate_rating( 122 self, type: TaskOutputRatingType, rating: float | None, rating_name: str 123 ) -> None: 124 if type == TaskOutputRatingType.five_star: 125 self._validate_five_star(rating, rating_name) 126 elif type == TaskOutputRatingType.pass_fail: 127 self._validate_pass_fail(rating, rating_name) 128 elif type == TaskOutputRatingType.pass_fail_critical: 129 self._validate_pass_fail_critical(rating, rating_name) 130 131 def _validate_five_star(self, rating: float | None, rating_name: str) -> None: 132 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 133 raise ValueError( 134 f"{rating_name.capitalize()} of type five_star must be an integer value (1-5)" 135 ) 136 if rating < 1 or rating > 5: 137 raise ValueError( 138 f"{rating_name.capitalize()} of type five_star must be between 1 and 5 stars" 139 ) 140 141 def _validate_pass_fail(self, rating: float | None, rating_name: str) -> None: 142 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 143 raise ValueError( 144 f"{rating_name.capitalize()} of type pass_fail must be an integer value (0 or 1)" 145 ) 146 if rating not in [0, 1]: 147 raise ValueError( 148 f"{rating_name.capitalize()} of type pass_fail must be 0 (fail) or 1 (pass)" 149 ) 150 151 def _validate_pass_fail_critical( 152 self, rating: float | None, rating_name: str 153 ) -> None: 154 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 155 raise ValueError( 156 f"{rating_name.capitalize()} of type pass_fail_critical must be an integer value (-1, 0, or 1)" 157 ) 158 if rating not in [-1, 0, 1]: 159 raise ValueError( 160 f"{rating_name.capitalize()} of type pass_fail_critical must be -1 (critical fail), 0 (fail), or 1 (pass)" 161 )
A rating for a task output, including an overall rating and ratings for each requirement.
Supports:
- five_star: 1-5 star ratings
- pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
- pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
71 @model_validator(mode="before") 72 def upgrade_old_format(cls, data: dict) -> dict: 73 if not isinstance(data, dict): 74 return data 75 76 # Check if we have the old format (dict of floats) 77 req_ratings = data.get("requirement_ratings", {}) 78 if req_ratings and all( 79 isinstance(v, (int, float)) for v in req_ratings.values() 80 ): 81 # Convert each float to a RequirementRating object 82 # all ratings are five star at the point we used this format 83 data["requirement_ratings"] = { 84 k: {"value": v, "type": TaskOutputRatingType.five_star} 85 for k, v in req_ratings.items() 86 } 87 88 return data
91 def is_high_quality(self) -> bool: 92 if self.value is None: 93 return False 94 95 if self.type == TaskOutputRatingType.five_star: 96 return self.value >= 4 97 elif self.type == TaskOutputRatingType.pass_fail: 98 return self.value == 1.0 99 elif self.type == TaskOutputRatingType.pass_fail_critical: 100 return self.value == 1.0 101 return False
103 @model_validator(mode="after") 104 def validate_rating(self) -> Self: 105 if self.type not in TaskOutputRatingType: 106 raise ValueError(f"Invalid rating type: {self.type}") 107 108 # Overall rating is optional 109 if self.value is not None: 110 self._validate_rating(self.type, self.value, "overall rating") 111 112 for req_id, req_rating in self.requirement_ratings.items(): 113 self._validate_rating( 114 req_rating.type, 115 req_rating.value, 116 f"requirement rating for req ID: {req_id}", 117 ) 118 119 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
15class TaskOutputRatingType(str, Enum): 16 """Defines the types of rating systems available for task outputs.""" 17 18 five_star = "five_star" 19 pass_fail = "pass_fail" 20 pass_fail_critical = "pass_fail_critical" 21 custom = "custom"
Defines the types of rating systems available for task outputs.
31class TaskRequirement(BaseModel): 32 """ 33 Defines a specific requirement that should be met by task outputs. 34 35 Includes an identifier, name, description, instruction for meeting the requirement, 36 priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom). 37 """ 38 39 id: ID_TYPE = ID_FIELD 40 name: FilenameStringShort = Field(description="The name of the task requirement.") 41 description: str | None = Field(default=None) 42 instruction: str = Field(min_length=1) 43 priority: Priority = Field(default=Priority.p2) 44 type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star)
Defines a specific requirement that should be met by task outputs.
Includes an identifier, name, description, instruction for meeting the requirement, priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).
77class TaskRun(KilnParentedModel): 78 """ 79 Represents a single execution of a Task. 80 81 Contains the input used, its source, the output produced, and optional 82 repair information if the output needed correction. 83 """ 84 85 input: str = Field( 86 description="The inputs to the task. JSON formatted for structured input, plaintext for unstructured input." 87 ) 88 input_source: DataSource | None = Field( 89 default=None, description="The source of the input: human or synthetic." 90 ) 91 92 output: TaskOutput = Field(description="The output of the task run.") 93 repair_instructions: str | None = Field( 94 default=None, 95 description="Instructions for fixing the output. Should define what is wrong, and how to fix it. Will be used by models for both generating a fixed output, and evaluating future models.", 96 ) 97 repaired_output: TaskOutput | None = Field( 98 default=None, 99 description="An version of the output with issues fixed. This must be a 'fixed' version of the existing output, and not an entirely new output. If you wish to generate an ideal curatorial output for this task unrelated to this output, generate a new TaskOutput with type 'human' instead of using this field.", 100 ) 101 intermediate_outputs: Dict[str, str] | None = Field( 102 default=None, 103 description="Intermediate outputs from the task run. Keys are the names of the intermediate output steps (cot=chain of thought, etc), values are the output data.", 104 ) 105 tags: List[str] = Field( 106 default=[], 107 description="Tags for the task run. Tags are used to categorize task runs for filtering and reporting.", 108 ) 109 usage: Usage | None = Field( 110 default=None, 111 description="Usage information for the task run. This includes the number of input tokens, output tokens, and total tokens used.", 112 ) 113 trace: list[ChatCompletionMessageParam] | None = Field( 114 default=None, 115 description="The trace of the task run in OpenAI format. This is the list of messages that were sent to/from the model.", 116 ) 117 118 def thinking_training_data(self) -> str | None: 119 """ 120 Get the thinking training data from the task run. 121 """ 122 if self.intermediate_outputs is None: 123 return None 124 return self.intermediate_outputs.get( 125 "reasoning" 126 ) or self.intermediate_outputs.get("chain_of_thought") 127 128 def has_thinking_training_data(self) -> bool: 129 """ 130 Does this run have thinking data that we can use to train a thinking model? 131 """ 132 return self.thinking_training_data() is not None 133 134 # Workaround to return typed parent without importing Task 135 def parent_task(self) -> Union["Task", None]: 136 if self.parent is None or self.parent.__class__.__name__ != "Task": 137 return None 138 return self.parent # type: ignore 139 140 @model_validator(mode="after") 141 def validate_input_format(self, info: ValidationInfo) -> Self: 142 # Don't validate if loading from file (not new). Too slow. 143 # We don't allow changing task schema, so this is redundant validation. 144 # Note: we still validate if editing a loaded model 145 if self.loading_from_file(info): 146 # Consider loading an existing model as validated. 147 self._last_validated_input = self.input 148 return self 149 150 # Don't validate if input has not changed. Too slow to run this every time. 151 if ( 152 hasattr(self, "_last_validated_input") 153 and self.input == self._last_validated_input 154 ): 155 return self 156 157 task = self.parent_task() 158 if task is None: 159 # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving) 160 return self 161 162 # validate input 163 if task.input_json_schema is not None: 164 try: 165 input_parsed = json.loads(self.input) 166 except json.JSONDecodeError: 167 raise ValueError("Input is not a valid JSON object") 168 169 validate_schema_with_value_error( 170 input_parsed, 171 task.input_json_schema, 172 "Input does not match task input schema.", 173 ) 174 175 self._last_validated_input = self.input 176 return self 177 178 @model_validator(mode="after") 179 def validate_output_format(self, info: ValidationInfo) -> Self: 180 # Don't validate if loading from file (not new). Too slow. 181 # Note: we still validate if editing a loaded model's output. 182 if self.loading_from_file(info): 183 # Consider loading an existing model as validated. 184 self._last_validated_output = self.output.output if self.output else None 185 return self 186 187 # Don't validate unless output has changed since last validation. 188 # The validator is slow and costly, don't want it running when setting other fields. 189 if ( 190 hasattr(self, "_last_validated_output") 191 and self.output is not None 192 and self.output.output == self._last_validated_output 193 ): 194 return self 195 196 task = self.parent_task() 197 if task is None: 198 return self 199 200 self.output.validate_output_format(task) 201 self._last_validated_output = self.output.output if self.output else None 202 return self 203 204 @model_validator(mode="after") 205 def validate_repaired_output(self) -> Self: 206 if self.repaired_output is not None: 207 if self.repaired_output.rating is not None: 208 raise ValueError( 209 "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed." 210 ) 211 212 task = self.parent_task() 213 if ( 214 task is not None 215 and self.repaired_output.output is not None 216 and task.output_json_schema is not None 217 ): 218 try: 219 output_parsed = json.loads(self.repaired_output.output) 220 except json.JSONDecodeError: 221 raise ValueError("Repaired output is not a valid JSON object") 222 223 validate_schema_with_value_error( 224 output_parsed, 225 task.output_json_schema, 226 "Repaired output does not match task output schema.", 227 ) 228 229 if self.repair_instructions is None and self.repaired_output is not None: 230 raise ValueError( 231 "Repair instructions are required if providing a repaired output." 232 ) 233 if self.repair_instructions is not None and self.repaired_output is None: 234 raise ValueError( 235 "A repaired output is required if providing repair instructions." 236 ) 237 238 return self 239 240 @model_validator(mode="after") 241 def validate_input_source(self, info: ValidationInfo) -> Self: 242 # On strict mode and not loaded from file, we validate input_source is not None. 243 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 244 if not strict_mode(): 245 return self 246 if self.loaded_from_file(info): 247 return self 248 if self.input_source is None: 249 raise ValueError("input_source is required when strict mode is enabled") 250 return self 251 252 @model_validator(mode="after") 253 def validate_tags(self) -> Self: 254 for tag in self.tags: 255 if not tag: 256 raise ValueError("Tags cannot be empty strings") 257 if " " in tag: 258 raise ValueError("Tags cannot contain spaces. Try underscores.") 259 260 return self
Represents a single execution of a Task.
Contains the input used, its source, the output produced, and optional repair information if the output needed correction.
118 def thinking_training_data(self) -> str | None: 119 """ 120 Get the thinking training data from the task run. 121 """ 122 if self.intermediate_outputs is None: 123 return None 124 return self.intermediate_outputs.get( 125 "reasoning" 126 ) or self.intermediate_outputs.get("chain_of_thought")
Get the thinking training data from the task run.
128 def has_thinking_training_data(self) -> bool: 129 """ 130 Does this run have thinking data that we can use to train a thinking model? 131 """ 132 return self.thinking_training_data() is not None
Does this run have thinking data that we can use to train a thinking model?
140 @model_validator(mode="after") 141 def validate_input_format(self, info: ValidationInfo) -> Self: 142 # Don't validate if loading from file (not new). Too slow. 143 # We don't allow changing task schema, so this is redundant validation. 144 # Note: we still validate if editing a loaded model 145 if self.loading_from_file(info): 146 # Consider loading an existing model as validated. 147 self._last_validated_input = self.input 148 return self 149 150 # Don't validate if input has not changed. Too slow to run this every time. 151 if ( 152 hasattr(self, "_last_validated_input") 153 and self.input == self._last_validated_input 154 ): 155 return self 156 157 task = self.parent_task() 158 if task is None: 159 # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving) 160 return self 161 162 # validate input 163 if task.input_json_schema is not None: 164 try: 165 input_parsed = json.loads(self.input) 166 except json.JSONDecodeError: 167 raise ValueError("Input is not a valid JSON object") 168 169 validate_schema_with_value_error( 170 input_parsed, 171 task.input_json_schema, 172 "Input does not match task input schema.", 173 ) 174 175 self._last_validated_input = self.input 176 return self
178 @model_validator(mode="after") 179 def validate_output_format(self, info: ValidationInfo) -> Self: 180 # Don't validate if loading from file (not new). Too slow. 181 # Note: we still validate if editing a loaded model's output. 182 if self.loading_from_file(info): 183 # Consider loading an existing model as validated. 184 self._last_validated_output = self.output.output if self.output else None 185 return self 186 187 # Don't validate unless output has changed since last validation. 188 # The validator is slow and costly, don't want it running when setting other fields. 189 if ( 190 hasattr(self, "_last_validated_output") 191 and self.output is not None 192 and self.output.output == self._last_validated_output 193 ): 194 return self 195 196 task = self.parent_task() 197 if task is None: 198 return self 199 200 self.output.validate_output_format(task) 201 self._last_validated_output = self.output.output if self.output else None 202 return self
204 @model_validator(mode="after") 205 def validate_repaired_output(self) -> Self: 206 if self.repaired_output is not None: 207 if self.repaired_output.rating is not None: 208 raise ValueError( 209 "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed." 210 ) 211 212 task = self.parent_task() 213 if ( 214 task is not None 215 and self.repaired_output.output is not None 216 and task.output_json_schema is not None 217 ): 218 try: 219 output_parsed = json.loads(self.repaired_output.output) 220 except json.JSONDecodeError: 221 raise ValueError("Repaired output is not a valid JSON object") 222 223 validate_schema_with_value_error( 224 output_parsed, 225 task.output_json_schema, 226 "Repaired output does not match task output schema.", 227 ) 228 229 if self.repair_instructions is None and self.repaired_output is not None: 230 raise ValueError( 231 "Repair instructions are required if providing a repaired output." 232 ) 233 if self.repair_instructions is not None and self.repaired_output is None: 234 raise ValueError( 235 "A repaired output is required if providing repair instructions." 236 ) 237 238 return self
240 @model_validator(mode="after") 241 def validate_input_source(self, info: ValidationInfo) -> Self: 242 # On strict mode and not loaded from file, we validate input_source is not None. 243 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 244 if not strict_mode(): 245 return self 246 if self.loaded_from_file(info): 247 return self 248 if self.input_source is None: 249 raise ValueError("input_source is required when strict mode is enabled") 250 return self
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict
][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
18class Usage(BaseModel): 19 input_tokens: int | None = Field( 20 default=None, 21 description="The number of input tokens used in the task run.", 22 ge=0, 23 ) 24 output_tokens: int | None = Field( 25 default=None, 26 description="The number of output tokens used in the task run.", 27 ge=0, 28 ) 29 total_tokens: int | None = Field( 30 default=None, 31 description="The total number of tokens used in the task run.", 32 ge=0, 33 ) 34 cost: float | None = Field( 35 default=None, 36 description="The cost of the task run in US dollars, saved at runtime (prices can change over time).", 37 ge=0, 38 ) 39 40 def __add__(self, other: "Usage") -> "Usage": 41 """Add two Usage objects together, handling None values gracefully. 42 43 None + None = None 44 None + value = value 45 value + None = value 46 value1 + value2 = value1 + value2 47 """ 48 if not isinstance(other, Usage): 49 raise TypeError(f"Cannot add Usage with {type(other).__name__}") 50 51 def _add_optional_int(a: int | None, b: int | None) -> int | None: 52 if a is None and b is None: 53 return None 54 if a is None: 55 return b 56 if b is None: 57 return a 58 return a + b 59 60 def _add_optional_float(a: float | None, b: float | None) -> float | None: 61 if a is None and b is None: 62 return None 63 if a is None: 64 return b 65 if b is None: 66 return a 67 return a + b 68 69 return Usage( 70 input_tokens=_add_optional_int(self.input_tokens, other.input_tokens), 71 output_tokens=_add_optional_int(self.output_tokens, other.output_tokens), 72 total_tokens=_add_optional_int(self.total_tokens, other.total_tokens), 73 cost=_add_optional_float(self.cost, other.cost), 74 )
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__
[Signature
][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.