kiln_ai.datamodel
See our docs for details about our datamodel classes and hierarchy:
Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html
1""" 2See our docs for details about our datamodel classes and hierarchy: 3 4Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html 5 6User docs: https://docs.kiln.tech/developers/kiln-datamodel 7""" 8 9# This component uses "flat" imports so we don't have too much internal structure exposed in the API. 10# for example you can just `from datamodel import Task, Project` instead of `from datamodel.task import Task; from datamodel.project import Project` 11 12from __future__ import annotations 13 14from kiln_ai.datamodel import ( 15 chunk, 16 dataset_split, 17 embedding, 18 eval, 19 extraction, 20 rag, 21 reranker, 22 strict_mode, 23) 24from kiln_ai.datamodel.basemodel import generate_model_id 25from kiln_ai.datamodel.datamodel_enums import ( 26 FineTuneStatusType, 27 Priority, 28 StructuredOutputMode, 29 TaskOutputRatingType, 30) 31from kiln_ai.datamodel.dataset_split import DatasetSplit, DatasetSplitDefinition 32from kiln_ai.datamodel.external_tool_server import ExternalToolServer 33from kiln_ai.datamodel.finetune import Finetune 34from kiln_ai.datamodel.project import Project 35from kiln_ai.datamodel.prompt import BasePrompt, Prompt 36from kiln_ai.datamodel.prompt_id import ( 37 PromptGenerators, 38 PromptId, 39 prompt_generator_values, 40) 41from kiln_ai.datamodel.task import Task, TaskRequirement 42from kiln_ai.datamodel.task_output import ( 43 DataSource, 44 DataSourceProperty, 45 DataSourceType, 46 RequirementRating, 47 TaskOutput, 48 TaskOutputRating, 49) 50from kiln_ai.datamodel.task_run import TaskRun, Usage 51 52__all__ = [ 53 "BasePrompt", 54 "DataSource", 55 "DataSourceProperty", 56 "DataSourceType", 57 "DatasetSplit", 58 "DatasetSplitDefinition", 59 "ExternalToolServer", 60 "FineTuneStatusType", 61 "Finetune", 62 "Priority", 63 "Project", 64 "Prompt", 65 "PromptGenerators", 66 "PromptId", 67 "RequirementRating", 68 "StructuredOutputMode", 69 "Task", 70 "TaskOutput", 71 "TaskOutputRating", 72 "TaskOutputRatingType", 73 "TaskRequirement", 74 "TaskRun", 75 "Usage", 76 "chunk", 77 "dataset_split", 78 "embedding", 79 "eval", 80 "extraction", 81 "generate_model_id", 82 "prompt_generator_values", 83 "rag", 84 "reranker", 85 "strict_mode", 86]
7class BasePrompt(BaseModel): 8 """ 9 A prompt for a task. This is the basic data storage format which can be used throughout a project. 10 11 The "Prompt" model name is reserved for the custom prompts parented by a task. 12 """ 13 14 name: FilenameString = Field(description="The name of the prompt.") 15 description: str | None = Field( 16 default=None, 17 description="A more detailed description of the prompt.", 18 ) 19 generator_id: str | None = Field( 20 default=None, 21 description="The id of the generator that created this prompt.", 22 ) 23 prompt: str = Field( 24 description="The prompt for the task.", 25 min_length=1, 26 ) 27 chain_of_thought_instructions: str | None = Field( 28 default=None, 29 description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting. COT will not be used unless this is provided.", 30 )
A prompt for a task. This is the basic data storage format which can be used throughout a project.
The "Prompt" model name is reserved for the custom prompts parented by a task.
192class DataSource(BaseModel): 193 """ 194 Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties. 195 196 Properties vary based on the source type - for synthetic/tool_call sources this includes 197 model information, for human sources this includes creator information, for file imports 198 this includes file information. 199 """ 200 201 type: DataSourceType 202 properties: Dict[str, str | int | float] = Field( 203 default={}, 204 description="Properties describing the data source. For synthetic things like model. For human: the human's name. For file_import: file information.", 205 ) 206 run_config: Optional[RunConfigProperties] = Field( 207 default=None, 208 description="The run config used to generate the data, if generated by a running a model in Kiln (only true for type=synthetic).", 209 ) 210 211 _data_source_properties = [ 212 DataSourceProperty( 213 name="created_by", 214 type=str, 215 required_for=[DataSourceType.human], 216 not_allowed_for=[ 217 DataSourceType.synthetic, 218 DataSourceType.file_import, 219 DataSourceType.tool_call, 220 ], 221 ), 222 DataSourceProperty( 223 name="model_name", 224 type=str, 225 required_for=[DataSourceType.synthetic], 226 not_allowed_for=[ 227 DataSourceType.human, 228 DataSourceType.file_import, 229 DataSourceType.tool_call, 230 ], 231 ), 232 DataSourceProperty( 233 name="model_provider", 234 type=str, 235 required_for=[DataSourceType.synthetic], 236 not_allowed_for=[ 237 DataSourceType.human, 238 DataSourceType.file_import, 239 DataSourceType.tool_call, 240 ], 241 ), 242 DataSourceProperty( 243 name="adapter_name", 244 type=str, 245 required_for=[DataSourceType.synthetic], 246 not_allowed_for=[ 247 DataSourceType.human, 248 DataSourceType.file_import, 249 DataSourceType.tool_call, 250 ], 251 ), 252 DataSourceProperty( 253 # Legacy field -- allow loading from old runs, but we shouldn't be setting it. 254 name="prompt_builder_name", 255 type=str, 256 not_allowed_for=[ 257 DataSourceType.human, 258 DataSourceType.file_import, 259 DataSourceType.tool_call, 260 ], 261 ), 262 DataSourceProperty( 263 # The PromptId of the prompt. Can be a saved prompt, fine-tune, generator name, etc. See PromptId type for more details. 264 name="prompt_id", 265 type=str, 266 not_allowed_for=[ 267 DataSourceType.human, 268 DataSourceType.file_import, 269 DataSourceType.tool_call, 270 ], 271 ), 272 DataSourceProperty( 273 name="file_name", 274 type=str, 275 required_for=[DataSourceType.file_import], 276 not_allowed_for=[ 277 DataSourceType.human, 278 DataSourceType.synthetic, 279 DataSourceType.tool_call, 280 ], 281 ), 282 ] 283 284 @model_validator(mode="after") 285 def validate_type(self) -> "DataSource": 286 if self.type not in DataSourceType: 287 raise ValueError(f"Invalid data source type: {self.type}") 288 return self 289 290 @model_validator(mode="after") 291 def validate_properties(self) -> "DataSource": 292 for prop in self._data_source_properties: 293 # Check the property type is correct 294 if prop.name in self.properties: 295 if not isinstance(self.properties[prop.name], prop.type): 296 raise ValueError( 297 f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source" 298 ) 299 # Check the property is required for the data source type 300 if self.type in prop.required_for: 301 if prop.name not in self.properties: 302 raise ValueError( 303 f"'{prop.name}' is required for {self.type} data source" 304 ) 305 # Check the property is not allowed for the data source type 306 elif self.type in prop.not_allowed_for and prop.name in self.properties: 307 raise ValueError( 308 f"'{prop.name}' is not allowed for {self.type} data source" 309 ) 310 return self 311 312 @model_validator(mode="after") 313 def validate_no_empty_properties(self) -> Self: 314 for prop, value in self.properties.items(): 315 if isinstance(value, str) and value == "": 316 raise ValueError( 317 f"Property '{prop}' must be a non-empty string for {self.type} data source" 318 ) 319 return self
Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.
Properties vary based on the source type - for synthetic/tool_call sources this includes model information, for human sources this includes creator information, for file imports this includes file information.
290 @model_validator(mode="after") 291 def validate_properties(self) -> "DataSource": 292 for prop in self._data_source_properties: 293 # Check the property type is correct 294 if prop.name in self.properties: 295 if not isinstance(self.properties[prop.name], prop.type): 296 raise ValueError( 297 f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source" 298 ) 299 # Check the property is required for the data source type 300 if self.type in prop.required_for: 301 if prop.name not in self.properties: 302 raise ValueError( 303 f"'{prop.name}' is required for {self.type} data source" 304 ) 305 # Check the property is not allowed for the data source type 306 elif self.type in prop.not_allowed_for and prop.name in self.properties: 307 raise ValueError( 308 f"'{prop.name}' is not allowed for {self.type} data source" 309 ) 310 return self
312 @model_validator(mode="after") 313 def validate_no_empty_properties(self) -> Self: 314 for prop, value in self.properties.items(): 315 if isinstance(value, str) and value == "": 316 raise ValueError( 317 f"Property '{prop}' must be a non-empty string for {self.type} data source" 318 ) 319 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
178class DataSourceProperty(BaseModel): 179 """ 180 Defines a property that can be associated with a data source. 181 182 Includes validation rules for when properties are required or not allowed 183 based on the data source type. 184 """ 185 186 name: str 187 type: Type[Union[str, int, float]] 188 required_for: List[DataSourceType] = [] 189 not_allowed_for: List[DataSourceType] = []
Defines a property that can be associated with a data source.
Includes validation rules for when properties are required or not allowed based on the data source type.
164class DataSourceType(str, Enum): 165 """ 166 The source type of a piece of data. 167 168 Human: a human created the data 169 Synthetic: a model created the data 170 """ 171 172 human = "human" 173 synthetic = "synthetic" 174 file_import = "file_import" 175 tool_call = "tool_call"
The source type of a piece of data.
Human: a human created the data Synthetic: a model created the data
67class DatasetSplit(KilnParentedModel): 68 """ 69 A collection of task runs, with optional splits (train, test, validation). 70 71 Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks. 72 73 Maintains a list of IDs for each split, to avoid data duplication. 74 """ 75 76 name: FilenameString = Field(description="The name of the dataset split.") 77 description: str | None = Field( 78 default=None, 79 description="A description of the dataset for you and your team. Not used in training.", 80 ) 81 splits: list[DatasetSplitDefinition] = Field( 82 default_factory=list, 83 description="The splits in the dataset.", 84 ) 85 split_contents: dict[str, list[str]] = Field( 86 description="The contents of each split in the dataset. The key is the split name, and the value is a list of task run IDs.", 87 ) 88 filter: DatasetFilterId | None = Field( 89 default=None, 90 description="The filter used to build the dataset.", 91 ) 92 93 @model_validator(mode="after") 94 def validate_split_percentages(self) -> "DatasetSplit": 95 total = sum(split.percentage for split in self.splits) 96 if not math.isclose(total, 1.0, rel_tol=1e-9): 97 raise ValueError(f"The sum of split percentages must be 1.0 (got {total})") 98 return self 99 100 @classmethod 101 def from_task( 102 cls, 103 name: str, 104 task: "Task", 105 splits: list[DatasetSplitDefinition], 106 filter_id: DatasetFilterId = "all", 107 description: str | None = None, 108 ): 109 """ 110 Build a dataset split from a task. 111 """ 112 filter = dataset_filter_from_id(filter_id) 113 split_contents = cls.build_split_contents(task, splits, filter) 114 return cls( 115 parent=task, 116 name=name, 117 description=description, 118 splits=splits, 119 split_contents=split_contents, 120 filter=filter_id, 121 ) 122 123 @classmethod 124 def build_split_contents( 125 cls, 126 task: "Task", 127 splits: list[DatasetSplitDefinition], 128 filter: DatasetFilter, 129 ) -> dict[str, list[str]]: 130 valid_ids = [] 131 for task_run in task.runs(): 132 if filter(task_run): 133 valid_ids.append(task_run.id) 134 135 # Shuffle and split by split percentage 136 random.shuffle(valid_ids) 137 split_contents = {} 138 start_idx = 0 139 remaining_items = len(valid_ids) 140 141 # Handle all splits except the last one 142 for split in splits[:-1]: 143 split_size = round(len(valid_ids) * split.percentage) 144 split_contents[split.name] = valid_ids[start_idx : start_idx + split_size] 145 start_idx += split_size 146 remaining_items -= split_size 147 148 # Last split gets all remaining items (for rounding) 149 if splits: 150 split_contents[splits[-1].name] = valid_ids[start_idx:] 151 152 return split_contents 153 154 def parent_task(self) -> "Task | None": 155 # inline import to avoid circular import 156 from kiln_ai.datamodel import Task 157 158 if not isinstance(self.parent, Task): 159 return None 160 return self.parent 161 162 def missing_count(self) -> int: 163 """ 164 Returns: 165 int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset 166 """ 167 parent = self.parent_task() 168 if parent is None: 169 raise ValueError("DatasetSplit has no parent task") 170 171 runs = parent.runs(readonly=True) 172 all_ids = set(run.id for run in runs) 173 all_ids_in_splits = set() 174 for ids in self.split_contents.values(): 175 all_ids_in_splits.update(ids) 176 missing = all_ids_in_splits - all_ids 177 return len(missing)
A collection of task runs, with optional splits (train, test, validation).
Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.
Maintains a list of IDs for each split, to avoid data duplication.
100 @classmethod 101 def from_task( 102 cls, 103 name: str, 104 task: "Task", 105 splits: list[DatasetSplitDefinition], 106 filter_id: DatasetFilterId = "all", 107 description: str | None = None, 108 ): 109 """ 110 Build a dataset split from a task. 111 """ 112 filter = dataset_filter_from_id(filter_id) 113 split_contents = cls.build_split_contents(task, splits, filter) 114 return cls( 115 parent=task, 116 name=name, 117 description=description, 118 splits=splits, 119 split_contents=split_contents, 120 filter=filter_id, 121 )
Build a dataset split from a task.
123 @classmethod 124 def build_split_contents( 125 cls, 126 task: "Task", 127 splits: list[DatasetSplitDefinition], 128 filter: DatasetFilter, 129 ) -> dict[str, list[str]]: 130 valid_ids = [] 131 for task_run in task.runs(): 132 if filter(task_run): 133 valid_ids.append(task_run.id) 134 135 # Shuffle and split by split percentage 136 random.shuffle(valid_ids) 137 split_contents = {} 138 start_idx = 0 139 remaining_items = len(valid_ids) 140 141 # Handle all splits except the last one 142 for split in splits[:-1]: 143 split_size = round(len(valid_ids) * split.percentage) 144 split_contents[split.name] = valid_ids[start_idx : start_idx + split_size] 145 start_idx += split_size 146 remaining_items -= split_size 147 148 # Last split gets all remaining items (for rounding) 149 if splits: 150 split_contents[splits[-1].name] = valid_ids[start_idx:] 151 152 return split_contents
162 def missing_count(self) -> int: 163 """ 164 Returns: 165 int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset 166 """ 167 parent = self.parent_task() 168 if parent is None: 169 raise ValueError("DatasetSplit has no parent task") 170 171 runs = parent.runs(readonly=True) 172 all_ids = set(run.id for run in runs) 173 all_ids_in_splits = set() 174 for ids in self.split_contents.values(): 175 all_ids_in_splits.update(ids) 176 missing = all_ids_in_splits - all_ids 177 return len(missing)
Returns: int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
23class DatasetSplitDefinition(BaseModel): 24 """ 25 A definition of a split in a dataset. 26 27 Example: name="train", description="The training set", percentage=0.8 (80% of the dataset) 28 """ 29 30 name: FilenameString = Field( 31 description="The name of the dataset split definition." 32 ) 33 description: str | None = Field( 34 default=None, 35 description="A description of the dataset for you and your team. Not used in training.", 36 ) 37 percentage: float = Field( 38 ge=0.0, 39 le=1.0, 40 description="The percentage of the dataset that this split represents (between 0 and 1).", 41 )
A definition of a split in a dataset.
Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)
52class ExternalToolServer(KilnParentedModel): 53 """ 54 Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local. 55 56 This model stores the necessary configuration to connect to and authenticate with 57 external MCP servers that provide tools for LLM interactions. 58 """ 59 60 name: FilenameString = Field(description="The name of the external tool.") 61 type: ToolServerType = Field( 62 description="The type of external tool server. Remote tools are hosted on a remote server", 63 ) 64 description: str | None = Field( 65 default=None, 66 description="A description of the external tool for you and your team. Will not be used in prompts/training/validation.", 67 ) 68 69 properties: ( 70 LocalServerProperties | RemoteServerProperties | KilnTaskServerProperties 71 ) = Field( 72 description="Configuration properties specific to the tool type.", 73 ) 74 75 # Private variable to store unsaved secrets 76 _unsaved_secrets: dict[str, str] = PrivateAttr(default_factory=dict) 77 78 def model_post_init(self, __context: Any) -> None: 79 # Process secrets after initialization (pydantic v2 hook) 80 self._process_secrets_from_properties() 81 82 def _process_secrets_from_properties(self) -> None: 83 """ 84 Extract secrets from properties and move them to _unsaved_secrets. 85 This removes secrets from the properties dict so they aren't saved to file. 86 Clears existing _unsaved_secrets first to handle property updates correctly. 87 """ 88 # Clear existing unsaved secrets since we're reprocessing 89 self._unsaved_secrets.clear() 90 91 secret_keys = self.get_secret_keys() 92 93 if not secret_keys: 94 return 95 96 # Extract secret values from properties based on server type 97 match self.type: 98 case ToolServerType.remote_mcp: 99 headers = self.properties.get("headers", {}) 100 for key_name in secret_keys: 101 if key_name in headers: 102 self._unsaved_secrets[key_name] = headers[key_name] 103 # Remove from headers immediately so they are not saved to file 104 del headers[key_name] 105 106 case ToolServerType.local_mcp: 107 env_vars = self.properties.get("env_vars", {}) 108 for key_name in secret_keys: 109 if key_name in env_vars: 110 self._unsaved_secrets[key_name] = env_vars[key_name] 111 # Remove from env_vars immediately so they are not saved to file 112 del env_vars[key_name] 113 114 case ToolServerType.kiln_task: 115 pass 116 117 case _: 118 raise_exhaustive_enum_error(self.type) 119 120 def __setattr__(self, name: str, value: Any) -> None: 121 """ 122 Override __setattr__ to process secrets whenever properties are updated. 123 """ 124 super().__setattr__(name, value) 125 126 # Process secrets whenever properties are updated 127 if name == "properties": 128 self._process_secrets_from_properties() 129 130 # Validation Helpers 131 132 @classmethod 133 def check_server_url(cls, server_url: str) -> None: 134 """Validate Server URL""" 135 if not isinstance(server_url, str): 136 raise ValueError("Server URL must be a string") 137 138 # Check for leading whitespace in URL 139 if server_url != server_url.lstrip(): 140 raise ValueError("Server URL must not have leading whitespace") 141 142 parsed_url = urlparse(server_url) 143 if not parsed_url.netloc: 144 raise ValueError("Server URL is not a valid URL") 145 if parsed_url.scheme not in ["http", "https"]: 146 raise ValueError("Server URL must start with http:// or https://") 147 148 @classmethod 149 def check_headers(cls, headers: dict) -> None: 150 """Validate Headers""" 151 if not isinstance(headers, dict): 152 raise ValueError("headers must be a dictionary") 153 154 for key, value in headers.items(): 155 if not key: 156 raise ValueError("Header name is required") 157 if not value: 158 raise ValueError("Header value is required") 159 160 # Reject invalid header names and CR/LF in names/values 161 token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$") 162 if not token_re.match(key): 163 raise ValueError(f'Invalid header name: "{key}"') 164 if re.search(r"\r|\n", key) or re.search(r"\r|\n", value): 165 raise ValueError( 166 "Header names/values must not contain invalid characters" 167 ) 168 169 @classmethod 170 def check_secret_keys( 171 cls, secret_keys: list, key_type: str, tool_type: str 172 ) -> None: 173 """Validate Secret Keys (generic method for both header and env var keys)""" 174 if not isinstance(secret_keys, list): 175 raise ValueError( 176 f"{key_type} must be a list for external tools of type '{tool_type}'" 177 ) 178 if not all(isinstance(k, str) for k in secret_keys): 179 raise ValueError(f"{key_type} must contain only strings") 180 if not all(key for key in secret_keys): 181 raise ValueError("Secret key is required") 182 183 @classmethod 184 def check_env_vars(cls, env_vars: dict) -> None: 185 """Validate Environment Variables""" 186 if not isinstance(env_vars, dict): 187 raise ValueError("environment variables must be a dictionary") 188 189 # Validate env_vars keys are in the correct format for Environment Variables 190 # According to POSIX specification, environment variable names must: 191 # - Start with a letter (a-z, A-Z) or underscore (_) 192 # - Contain only ASCII letters, digits, and underscores 193 for key, _ in env_vars.items(): 194 if not key or not ( 195 key[0].isascii() and (key[0].isalpha() or key[0] == "_") 196 ): 197 raise ValueError( 198 f"Invalid environment variable key: {key}. Must start with a letter or underscore." 199 ) 200 201 if not all(c.isascii() and (c.isalnum() or c == "_") for c in key): 202 raise ValueError( 203 f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores." 204 ) 205 206 @classmethod 207 def type_from_data(cls, data: dict) -> ToolServerType: 208 """Get the tool server type from the data for the the validators""" 209 raw_type = data.get("type") 210 if raw_type is None: 211 raise ValueError("type is required") 212 try: 213 return ToolServerType(raw_type) 214 except ValueError: 215 valid_types = ", ".join(type.value for type in ToolServerType) 216 raise ValueError(f"type must be one of: {valid_types}") 217 218 @model_validator(mode="before") 219 def upgrade_old_properties(cls, data: dict) -> dict: 220 """ 221 Upgrade properties for backwards compatibility. 222 """ 223 properties = data.get("properties") 224 if properties is not None and "is_archived" not in properties: 225 # Add is_archived field with default value back to data 226 properties["is_archived"] = False 227 data["properties"] = properties 228 return data 229 230 @model_validator(mode="before") 231 def validate_required_fields(cls, data: dict) -> dict: 232 """Validate that each tool type has the required configuration.""" 233 server_type = ExternalToolServer.type_from_data(data) 234 properties = data.get("properties", {}) 235 236 match server_type: 237 case ToolServerType.remote_mcp: 238 server_url = properties.get("server_url", None) 239 if server_url is None: 240 raise ValueError( 241 "Server URL is required to connect to a remote MCP server" 242 ) 243 ExternalToolServer.check_server_url(server_url) 244 245 case ToolServerType.local_mcp: 246 command = properties.get("command", None) 247 if command is None: 248 raise ValueError("command is required to start a local MCP server") 249 if not isinstance(command, str): 250 raise ValueError( 251 "command must be a string to start a local MCP server" 252 ) 253 # Reject empty/whitespace-only command strings 254 if command.strip() == "": 255 raise ValueError("command must be a non-empty string") 256 257 args = properties.get("args", None) 258 if args is not None: 259 if not isinstance(args, list): 260 raise ValueError( 261 "arguments must be a list to start a local MCP server" 262 ) 263 264 case ToolServerType.kiln_task: 265 tool_name_validator(properties.get("name", "")) 266 err_msg_prefix = "Kiln task server properties:" 267 validate_return_dict_prop( 268 properties, "description", str, err_msg_prefix 269 ) 270 description = properties.get("description", "") 271 if len(description) > 128: 272 raise ValueError("description must be 128 characters or less") 273 validate_return_dict_prop( 274 properties, "is_archived", bool, err_msg_prefix 275 ) 276 validate_return_dict_prop(properties, "task_id", str, err_msg_prefix) 277 validate_return_dict_prop( 278 properties, "run_config_id", str, err_msg_prefix 279 ) 280 281 case _: 282 # Type checking will catch missing cases 283 raise_exhaustive_enum_error(server_type) 284 return data 285 286 @model_validator(mode="before") 287 def validate_headers_and_env_vars(cls, data: dict) -> dict: 288 """ 289 Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped 290 """ 291 type = ExternalToolServer.type_from_data(data) 292 293 properties = data.get("properties", {}) 294 if properties is None: 295 raise ValueError("properties is required") 296 297 match type: 298 case ToolServerType.remote_mcp: 299 # Validate headers 300 headers = properties.get("headers", None) 301 if headers is not None: 302 ExternalToolServer.check_headers(headers) 303 304 # Secret header keys are optional, validate if they are set 305 secret_header_keys = properties.get("secret_header_keys", None) 306 if secret_header_keys is not None: 307 ExternalToolServer.check_secret_keys( 308 secret_header_keys, "secret_header_keys", "remote_mcp" 309 ) 310 311 case ToolServerType.local_mcp: 312 # Validate secret environment variable keys 313 env_vars = properties.get("env_vars", {}) 314 if env_vars is not None: 315 ExternalToolServer.check_env_vars(env_vars) 316 317 # Secret env var keys are optional, but if they are set, they must be a list of strings 318 secret_env_var_keys = properties.get("secret_env_var_keys", None) 319 if secret_env_var_keys is not None: 320 ExternalToolServer.check_secret_keys( 321 secret_env_var_keys, "secret_env_var_keys", "local_mcp" 322 ) 323 324 case ToolServerType.kiln_task: 325 pass 326 327 case _: 328 raise_exhaustive_enum_error(type) 329 330 return data 331 332 def get_secret_keys(self) -> list[str]: 333 """ 334 Get the list of secret key names based on server type. 335 336 Returns: 337 List of secret key names (header names for remote, env var names for local) 338 """ 339 match self.type: 340 case ToolServerType.remote_mcp: 341 return self.properties.get("secret_header_keys", []) 342 case ToolServerType.local_mcp: 343 return self.properties.get("secret_env_var_keys", []) 344 case ToolServerType.kiln_task: 345 return [] 346 case _: 347 raise_exhaustive_enum_error(self.type) 348 349 def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]: 350 """ 351 Retrieve secrets from configuration system or in-memory storage. 352 Automatically determines which secret keys to retrieve based on the server type. 353 Config secrets take precedence over unsaved secrets. 354 355 Returns: 356 Tuple of (secrets_dict, missing_secrets_list) where: 357 - secrets_dict: Dictionary mapping key names to their secret values 358 - missing_secrets_list: List of secret key names that are missing values 359 """ 360 secrets = {} 361 missing_secrets = [] 362 secret_keys = self.get_secret_keys() 363 364 if secret_keys and len(secret_keys) > 0: 365 config = Config.shared() 366 mcp_secrets = config.get_value(MCP_SECRETS_KEY) 367 368 for key_name in secret_keys: 369 secret_value = None 370 371 # First check config secrets (persistent storage), key is mcp_server_id::key_name 372 secret_key = self._config_secret_key(key_name) 373 secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None 374 375 # Fall back to unsaved secrets (in-memory storage) 376 if ( 377 not secret_value 378 and hasattr(self, "_unsaved_secrets") 379 and key_name in self._unsaved_secrets 380 ): 381 secret_value = self._unsaved_secrets[key_name] 382 383 if secret_value: 384 secrets[key_name] = secret_value 385 else: 386 missing_secrets.append(key_name) 387 388 return secrets, missing_secrets 389 390 def _save_secrets(self) -> None: 391 """ 392 Save unsaved secrets to the configuration system. 393 """ 394 secret_keys = self.get_secret_keys() 395 396 # No secrets to save 397 if not secret_keys: 398 return 399 400 if self.id is None: 401 raise ValueError("Server ID cannot be None when saving secrets") 402 403 # Check if secrets are already saved 404 if not hasattr(self, "_unsaved_secrets") or not self._unsaved_secrets: 405 return 406 407 config = Config.shared() 408 mcp_secrets: dict[str, str] = config.get_value(MCP_SECRETS_KEY) or {} 409 410 # Store secrets with the pattern: mcp_server_id::key_name 411 for key_name, secret_value in self._unsaved_secrets.items(): 412 secret_key = self._config_secret_key(key_name) 413 mcp_secrets[secret_key] = secret_value 414 415 config.update_settings({MCP_SECRETS_KEY: mcp_secrets}) 416 417 # Clear unsaved secrets after saving 418 self._unsaved_secrets.clear() 419 420 def delete_secrets(self) -> None: 421 """ 422 Delete all secrets for this tool server from the configuration system. 423 """ 424 secret_keys = self.get_secret_keys() 425 426 config = Config.shared() 427 mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]() 428 429 # Remove secrets with the pattern: mcp_server_id::key_name 430 for key_name in secret_keys: 431 secret_key = self._config_secret_key(key_name) 432 if secret_key in mcp_secrets: 433 del mcp_secrets[secret_key] 434 435 # Always call update_settings to maintain consistency with the old behavior 436 config.update_settings({MCP_SECRETS_KEY: mcp_secrets}) 437 438 def save_to_file(self) -> None: 439 """ 440 Override save_to_file to automatically save any unsaved secrets before saving to file. 441 442 This ensures that secrets are always saved when the object is saved, 443 preventing the issue where secrets could be lost if save_to_file is called 444 without explicitly saving secrets first. 445 """ 446 # Save any unsaved secrets first 447 if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets: 448 self._save_secrets() 449 450 # Call the parent save_to_file method 451 super().save_to_file() 452 453 # Internal helpers 454 455 def _config_secret_key(self, key_name: str) -> str: 456 """ 457 Generate the secret key pattern for storing/retrieving secrets. 458 459 Args: 460 key_name: The name of the secret key 461 462 Returns: 463 The formatted secret key: "{server_id}::{key_name}" 464 """ 465 return f"{self.id}::{key_name}"
Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.
This model stores the necessary configuration to connect to and authenticate with external MCP servers that provide tools for LLM interactions.
78 def model_post_init(self, __context: Any) -> None: 79 # Process secrets after initialization (pydantic v2 hook) 80 self._process_secrets_from_properties()
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
132 @classmethod 133 def check_server_url(cls, server_url: str) -> None: 134 """Validate Server URL""" 135 if not isinstance(server_url, str): 136 raise ValueError("Server URL must be a string") 137 138 # Check for leading whitespace in URL 139 if server_url != server_url.lstrip(): 140 raise ValueError("Server URL must not have leading whitespace") 141 142 parsed_url = urlparse(server_url) 143 if not parsed_url.netloc: 144 raise ValueError("Server URL is not a valid URL") 145 if parsed_url.scheme not in ["http", "https"]: 146 raise ValueError("Server URL must start with http:// or https://")
Validate Server URL
148 @classmethod 149 def check_headers(cls, headers: dict) -> None: 150 """Validate Headers""" 151 if not isinstance(headers, dict): 152 raise ValueError("headers must be a dictionary") 153 154 for key, value in headers.items(): 155 if not key: 156 raise ValueError("Header name is required") 157 if not value: 158 raise ValueError("Header value is required") 159 160 # Reject invalid header names and CR/LF in names/values 161 token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$") 162 if not token_re.match(key): 163 raise ValueError(f'Invalid header name: "{key}"') 164 if re.search(r"\r|\n", key) or re.search(r"\r|\n", value): 165 raise ValueError( 166 "Header names/values must not contain invalid characters" 167 )
Validate Headers
169 @classmethod 170 def check_secret_keys( 171 cls, secret_keys: list, key_type: str, tool_type: str 172 ) -> None: 173 """Validate Secret Keys (generic method for both header and env var keys)""" 174 if not isinstance(secret_keys, list): 175 raise ValueError( 176 f"{key_type} must be a list for external tools of type '{tool_type}'" 177 ) 178 if not all(isinstance(k, str) for k in secret_keys): 179 raise ValueError(f"{key_type} must contain only strings") 180 if not all(key for key in secret_keys): 181 raise ValueError("Secret key is required")
Validate Secret Keys (generic method for both header and env var keys)
183 @classmethod 184 def check_env_vars(cls, env_vars: dict) -> None: 185 """Validate Environment Variables""" 186 if not isinstance(env_vars, dict): 187 raise ValueError("environment variables must be a dictionary") 188 189 # Validate env_vars keys are in the correct format for Environment Variables 190 # According to POSIX specification, environment variable names must: 191 # - Start with a letter (a-z, A-Z) or underscore (_) 192 # - Contain only ASCII letters, digits, and underscores 193 for key, _ in env_vars.items(): 194 if not key or not ( 195 key[0].isascii() and (key[0].isalpha() or key[0] == "_") 196 ): 197 raise ValueError( 198 f"Invalid environment variable key: {key}. Must start with a letter or underscore." 199 ) 200 201 if not all(c.isascii() and (c.isalnum() or c == "_") for c in key): 202 raise ValueError( 203 f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores." 204 )
Validate Environment Variables
206 @classmethod 207 def type_from_data(cls, data: dict) -> ToolServerType: 208 """Get the tool server type from the data for the the validators""" 209 raw_type = data.get("type") 210 if raw_type is None: 211 raise ValueError("type is required") 212 try: 213 return ToolServerType(raw_type) 214 except ValueError: 215 valid_types = ", ".join(type.value for type in ToolServerType) 216 raise ValueError(f"type must be one of: {valid_types}")
Get the tool server type from the data for the the validators
218 @model_validator(mode="before") 219 def upgrade_old_properties(cls, data: dict) -> dict: 220 """ 221 Upgrade properties for backwards compatibility. 222 """ 223 properties = data.get("properties") 224 if properties is not None and "is_archived" not in properties: 225 # Add is_archived field with default value back to data 226 properties["is_archived"] = False 227 data["properties"] = properties 228 return data
Upgrade properties for backwards compatibility.
230 @model_validator(mode="before") 231 def validate_required_fields(cls, data: dict) -> dict: 232 """Validate that each tool type has the required configuration.""" 233 server_type = ExternalToolServer.type_from_data(data) 234 properties = data.get("properties", {}) 235 236 match server_type: 237 case ToolServerType.remote_mcp: 238 server_url = properties.get("server_url", None) 239 if server_url is None: 240 raise ValueError( 241 "Server URL is required to connect to a remote MCP server" 242 ) 243 ExternalToolServer.check_server_url(server_url) 244 245 case ToolServerType.local_mcp: 246 command = properties.get("command", None) 247 if command is None: 248 raise ValueError("command is required to start a local MCP server") 249 if not isinstance(command, str): 250 raise ValueError( 251 "command must be a string to start a local MCP server" 252 ) 253 # Reject empty/whitespace-only command strings 254 if command.strip() == "": 255 raise ValueError("command must be a non-empty string") 256 257 args = properties.get("args", None) 258 if args is not None: 259 if not isinstance(args, list): 260 raise ValueError( 261 "arguments must be a list to start a local MCP server" 262 ) 263 264 case ToolServerType.kiln_task: 265 tool_name_validator(properties.get("name", "")) 266 err_msg_prefix = "Kiln task server properties:" 267 validate_return_dict_prop( 268 properties, "description", str, err_msg_prefix 269 ) 270 description = properties.get("description", "") 271 if len(description) > 128: 272 raise ValueError("description must be 128 characters or less") 273 validate_return_dict_prop( 274 properties, "is_archived", bool, err_msg_prefix 275 ) 276 validate_return_dict_prop(properties, "task_id", str, err_msg_prefix) 277 validate_return_dict_prop( 278 properties, "run_config_id", str, err_msg_prefix 279 ) 280 281 case _: 282 # Type checking will catch missing cases 283 raise_exhaustive_enum_error(server_type) 284 return data
Validate that each tool type has the required configuration.
286 @model_validator(mode="before") 287 def validate_headers_and_env_vars(cls, data: dict) -> dict: 288 """ 289 Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped 290 """ 291 type = ExternalToolServer.type_from_data(data) 292 293 properties = data.get("properties", {}) 294 if properties is None: 295 raise ValueError("properties is required") 296 297 match type: 298 case ToolServerType.remote_mcp: 299 # Validate headers 300 headers = properties.get("headers", None) 301 if headers is not None: 302 ExternalToolServer.check_headers(headers) 303 304 # Secret header keys are optional, validate if they are set 305 secret_header_keys = properties.get("secret_header_keys", None) 306 if secret_header_keys is not None: 307 ExternalToolServer.check_secret_keys( 308 secret_header_keys, "secret_header_keys", "remote_mcp" 309 ) 310 311 case ToolServerType.local_mcp: 312 # Validate secret environment variable keys 313 env_vars = properties.get("env_vars", {}) 314 if env_vars is not None: 315 ExternalToolServer.check_env_vars(env_vars) 316 317 # Secret env var keys are optional, but if they are set, they must be a list of strings 318 secret_env_var_keys = properties.get("secret_env_var_keys", None) 319 if secret_env_var_keys is not None: 320 ExternalToolServer.check_secret_keys( 321 secret_env_var_keys, "secret_env_var_keys", "local_mcp" 322 ) 323 324 case ToolServerType.kiln_task: 325 pass 326 327 case _: 328 raise_exhaustive_enum_error(type) 329 330 return data
Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
332 def get_secret_keys(self) -> list[str]: 333 """ 334 Get the list of secret key names based on server type. 335 336 Returns: 337 List of secret key names (header names for remote, env var names for local) 338 """ 339 match self.type: 340 case ToolServerType.remote_mcp: 341 return self.properties.get("secret_header_keys", []) 342 case ToolServerType.local_mcp: 343 return self.properties.get("secret_env_var_keys", []) 344 case ToolServerType.kiln_task: 345 return [] 346 case _: 347 raise_exhaustive_enum_error(self.type)
Get the list of secret key names based on server type.
Returns: List of secret key names (header names for remote, env var names for local)
349 def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]: 350 """ 351 Retrieve secrets from configuration system or in-memory storage. 352 Automatically determines which secret keys to retrieve based on the server type. 353 Config secrets take precedence over unsaved secrets. 354 355 Returns: 356 Tuple of (secrets_dict, missing_secrets_list) where: 357 - secrets_dict: Dictionary mapping key names to their secret values 358 - missing_secrets_list: List of secret key names that are missing values 359 """ 360 secrets = {} 361 missing_secrets = [] 362 secret_keys = self.get_secret_keys() 363 364 if secret_keys and len(secret_keys) > 0: 365 config = Config.shared() 366 mcp_secrets = config.get_value(MCP_SECRETS_KEY) 367 368 for key_name in secret_keys: 369 secret_value = None 370 371 # First check config secrets (persistent storage), key is mcp_server_id::key_name 372 secret_key = self._config_secret_key(key_name) 373 secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None 374 375 # Fall back to unsaved secrets (in-memory storage) 376 if ( 377 not secret_value 378 and hasattr(self, "_unsaved_secrets") 379 and key_name in self._unsaved_secrets 380 ): 381 secret_value = self._unsaved_secrets[key_name] 382 383 if secret_value: 384 secrets[key_name] = secret_value 385 else: 386 missing_secrets.append(key_name) 387 388 return secrets, missing_secrets
Retrieve secrets from configuration system or in-memory storage. Automatically determines which secret keys to retrieve based on the server type. Config secrets take precedence over unsaved secrets.
Returns: Tuple of (secrets_dict, missing_secrets_list) where: - secrets_dict: Dictionary mapping key names to their secret values - missing_secrets_list: List of secret key names that are missing values
420 def delete_secrets(self) -> None: 421 """ 422 Delete all secrets for this tool server from the configuration system. 423 """ 424 secret_keys = self.get_secret_keys() 425 426 config = Config.shared() 427 mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]() 428 429 # Remove secrets with the pattern: mcp_server_id::key_name 430 for key_name in secret_keys: 431 secret_key = self._config_secret_key(key_name) 432 if secret_key in mcp_secrets: 433 del mcp_secrets[secret_key] 434 435 # Always call update_settings to maintain consistency with the old behavior 436 config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
Delete all secrets for this tool server from the configuration system.
438 def save_to_file(self) -> None: 439 """ 440 Override save_to_file to automatically save any unsaved secrets before saving to file. 441 442 This ensures that secrets are always saved when the object is saved, 443 preventing the issue where secrets could be lost if save_to_file is called 444 without explicitly saving secrets first. 445 """ 446 # Save any unsaved secrets first 447 if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets: 448 self._save_secrets() 449 450 # Call the parent save_to_file method 451 super().save_to_file()
Override save_to_file to automatically save any unsaved secrets before saving to file.
This ensures that secrets are always saved when the object is saved, preventing the issue where secrets could be lost if save_to_file is called without explicitly saving secrets first.
53class FineTuneStatusType(str, Enum): 54 """ 55 The status type of a fine-tune (running, completed, failed, etc). 56 """ 57 58 unknown = "unknown" # server error 59 pending = "pending" 60 running = "running" 61 completed = "completed" 62 failed = "failed"
The status type of a fine-tune (running, completed, failed, etc).
23class Finetune(KilnParentedModel): 24 """ 25 The Kiln fine-tune datamodel. 26 27 Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID. 28 """ 29 30 name: FilenameString = Field(description="The name of the fine-tune.") 31 description: str | None = Field( 32 default=None, 33 description="A description of the fine-tune for you and your team. Not used in training.", 34 ) 35 structured_output_mode: StructuredOutputMode | None = Field( 36 default=None, 37 description="The mode to use to train the model for structured output, if it was trained with structured output. Will determine how we call the tuned model, so we call with the matching mode.", 38 ) 39 provider: str = Field( 40 description="The provider to use for the fine-tune (e.g. 'openai')." 41 ) 42 base_model_id: str = Field( 43 description="The id of the base model to use for the fine-tune. This string relates to the provider's IDs for their own models, not Kiln IDs." 44 ) 45 provider_id: str | None = Field( 46 default=None, 47 description="The ID of the fine-tune job on the provider's side. May not be the same as the fine_tune_model_id.", 48 ) 49 fine_tune_model_id: str | None = Field( 50 default=None, 51 description="The ID of the fine-tuned model on the provider's side. May not be the same as the provider_id.", 52 ) 53 dataset_split_id: str = Field( 54 description="The ID of the dataset split to use for this fine-tune.", 55 ) 56 train_split_name: str = Field( 57 default="train", 58 description="The name of the training split to use for this fine-tune.", 59 ) 60 validation_split_name: str | None = Field( 61 default=None, 62 description="The name of the validation split to use for this fine-tune. Optional.", 63 ) 64 parameters: dict[str, str | int | float | bool] = Field( 65 default={}, 66 description="The parameters to use for this fine-tune. These are provider-specific.", 67 ) 68 # These two fields are saved exactly used for training. Even if they map exactly to a custom prompt or generator, those can change, so we want to keep a record of the training prompt. 69 system_message: str = Field( 70 description="The system message to use for this fine-tune.", 71 ) 72 thinking_instructions: str | None = Field( 73 default=None, 74 description="The thinking instructions to use for this fine-tune. Only used when data_strategy is final_and_intermediate.", 75 ) 76 latest_status: FineTuneStatusType = Field( 77 default=FineTuneStatusType.unknown, 78 description="The latest known status of this fine-tune. Not updated in real time.", 79 ) 80 properties: Dict[str, str | int | float] = Field( 81 default={}, 82 description="Properties of the fine-tune. Different providers may use different properties.", 83 ) 84 data_strategy: ChatStrategy = Field( 85 default=ChatStrategy.single_turn, 86 description="The strategy to use for training the model. 'final_only' will only train on the final response. 'final_and_intermediate' will train on the final response and intermediate outputs (chain of thought or reasoning).", 87 ) 88 89 # Workaround to return typed parent without importing Task 90 def parent_task(self) -> Union["Task", None]: 91 if self.parent is None or self.parent.__class__.__name__ != "Task": 92 return None 93 return self.parent # type: ignore 94 95 @model_validator(mode="after") 96 def validate_thinking_instructions(self) -> Self: 97 if ( 98 self.thinking_instructions is not None 99 and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 100 ): 101 raise ValueError( 102 f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 103 ) 104 if ( 105 self.thinking_instructions is None 106 and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 107 ): 108 raise ValueError( 109 f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 110 ) 111 return self
The Kiln fine-tune datamodel.
Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.
95 @model_validator(mode="after") 96 def validate_thinking_instructions(self) -> Self: 97 if ( 98 self.thinking_instructions is not None 99 and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 100 ): 101 raise ValueError( 102 f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 103 ) 104 if ( 105 self.thinking_instructions is None 106 and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 107 ): 108 raise ValueError( 109 f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 110 ) 111 return self
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
9class Priority(IntEnum): 10 """Defines priority levels for tasks and requirements, where P0 is highest priority.""" 11 12 p0 = 0 13 p1 = 1 14 p2 = 2 15 p3 = 3
Defines priority levels for tasks and requirements, where P0 is highest priority.
15class Project( 16 KilnParentModel, 17 parent_of={ 18 "tasks": Task, 19 "documents": Document, 20 "extractor_configs": ExtractorConfig, 21 "chunker_configs": ChunkerConfig, 22 "embedding_configs": EmbeddingConfig, 23 "rag_configs": RagConfig, 24 "vector_store_configs": VectorStoreConfig, 25 "external_tool_servers": ExternalToolServer, 26 "reranker_configs": RerankerConfig, 27 }, 28): 29 """ 30 A collection of related tasks. 31 32 Projects organize tasks into logical groups and provide high-level descriptions 33 of the overall goals. 34 """ 35 36 name: FilenameString = Field(description="The name of the project.") 37 description: str | None = Field( 38 default=None, 39 description="A description of the project for you and your team. Will not be used in prompts/training/validation.", 40 ) 41 42 # Needed for typechecking. We should fix this in KilnParentModel 43 def tasks(self, readonly: bool = False) -> list[Task]: 44 return super().tasks(readonly=readonly) # type: ignore 45 46 def documents(self, readonly: bool = False) -> list[Document]: 47 return super().documents(readonly=readonly) # type: ignore 48 49 def extractor_configs(self, readonly: bool = False) -> list[ExtractorConfig]: 50 return super().extractor_configs(readonly=readonly) # type: ignore 51 52 def chunker_configs(self, readonly: bool = False) -> list[ChunkerConfig]: 53 return super().chunker_configs(readonly=readonly) # type: ignore 54 55 def embedding_configs(self, readonly: bool = False) -> list[EmbeddingConfig]: 56 return super().embedding_configs(readonly=readonly) # type: ignore 57 58 def vector_store_configs(self, readonly: bool = False) -> list[VectorStoreConfig]: 59 return super().vector_store_configs(readonly=readonly) # type: ignore 60 61 def rag_configs(self, readonly: bool = False) -> list[RagConfig]: 62 return super().rag_configs(readonly=readonly) # type: ignore 63 64 def external_tool_servers(self, readonly: bool = False) -> list[ExternalToolServer]: 65 return super().external_tool_servers(readonly=readonly) # type: ignore 66 67 def reranker_configs(self, readonly: bool = False) -> list[RerankerConfig]: 68 return super().reranker_configs(readonly=readonly) # type: ignore
A collection of related tasks.
Projects organize tasks into logical groups and provide high-level descriptions of the overall goals.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
33class Prompt(KilnParentedModel, BasePrompt): 34 """ 35 A prompt for a task. This is the custom prompt parented by a task. 36 """ 37 38 pass
A prompt for a task. This is the custom prompt parented by a task.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
9class PromptGenerators(str, Enum): 10 SIMPLE = "simple_prompt_builder" 11 MULTI_SHOT = "multi_shot_prompt_builder" 12 FEW_SHOT = "few_shot_prompt_builder" 13 REPAIRS = "repairs_prompt_builder" 14 SIMPLE_CHAIN_OF_THOUGHT = "simple_chain_of_thought_prompt_builder" 15 FEW_SHOT_CHAIN_OF_THOUGHT = "few_shot_chain_of_thought_prompt_builder" 16 MULTI_SHOT_CHAIN_OF_THOUGHT = "multi_shot_chain_of_thought_prompt_builder" 17 SHORT = "short_prompt_builder"
str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str
Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.__str__() (if defined) or repr(object). encoding defaults to 'utf-8'. errors defaults to 'strict'.
20class RequirementRating(BaseModel): 21 """Rating for a specific requirement within a task output.""" 22 23 value: float = Field( 24 description="The rating value. Interpretation depends on rating type" 25 ) 26 type: TaskOutputRatingType = Field(description="The type of rating")
Rating for a specific requirement within a task output.
28class StructuredOutputMode(str, Enum): 29 """ 30 Enumeration of supported structured output modes. 31 32 - json_schema: request json using API capabilities for json_schema 33 - function_calling: request json using API capabilities for function calling 34 - json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema 35 - json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings. 36 - json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries). 37 - json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions. 38 - default: let the adapter decide (legacy, do not use for new use cases) 39 - unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime. 40 """ 41 42 default = "default" 43 json_schema = "json_schema" 44 function_calling_weak = "function_calling_weak" 45 function_calling = "function_calling" 46 json_mode = "json_mode" 47 json_instructions = "json_instructions" 48 json_instruction_and_object = "json_instruction_and_object" 49 json_custom_instructions = "json_custom_instructions" 50 unknown = "unknown"
Enumeration of supported structured output modes.
- json_schema: request json using API capabilities for json_schema
- function_calling: request json using API capabilities for function calling
- json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
- json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
- json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
- json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
- default: let the adapter decide (legacy, do not use for new use cases)
- unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
103class Task( 104 KilnParentedModel, 105 KilnParentModel, 106 parent_of={ 107 "runs": TaskRun, 108 "dataset_splits": DatasetSplit, 109 "finetunes": Finetune, 110 "prompts": Prompt, 111 "evals": Eval, 112 "run_configs": TaskRunConfig, 113 }, 114): 115 """ 116 Represents a specific task to be performed, with associated requirements and validation rules. 117 118 Contains the task definition, requirements, input/output schemas, and maintains 119 a collection of task runs. 120 """ 121 122 name: FilenameString = Field(description="The name of the task.") 123 description: str | None = Field( 124 default=None, 125 description="A description of the task for you and your team. Will not be used in prompts/training/validation.", 126 ) 127 instruction: str = Field( 128 min_length=1, 129 description="The instructions for the task. Will be used in prompts/training/validation.", 130 ) 131 requirements: List[TaskRequirement] = Field(default=[]) 132 # Output must be an object schema, as things like tool calls only allow objects 133 output_json_schema: JsonObjectSchema | None = None 134 # Inputs are more flexible, allowing arrays 135 input_json_schema: JsonSchema | None = None 136 thinking_instruction: str | None = Field( 137 default=None, 138 description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting.", 139 ) 140 141 default_run_config_id: ID_TYPE | None = Field( 142 default=None, 143 description="ID of the run config to use for this task by default. Must exist in saved run configs for this task.", 144 ) 145 146 def output_schema(self) -> Dict | None: 147 if self.output_json_schema is None: 148 return None 149 return schema_from_json_str(self.output_json_schema) 150 151 def input_schema(self) -> Dict | None: 152 if self.input_json_schema is None: 153 return None 154 # Allow arrays, not just objects 155 return schema_from_json_str(self.input_json_schema, require_object=False) 156 157 # These wrappers help for typechecking. We should fix this in KilnParentModel 158 def runs(self, readonly: bool = False) -> list[TaskRun]: 159 return super().runs(readonly=readonly) # type: ignore 160 161 def dataset_splits(self, readonly: bool = False) -> list[DatasetSplit]: 162 return super().dataset_splits(readonly=readonly) # type: ignore 163 164 def finetunes(self, readonly: bool = False) -> list[Finetune]: 165 return super().finetunes(readonly=readonly) # type: ignore 166 167 def prompts(self, readonly: bool = False) -> list[Prompt]: 168 return super().prompts(readonly=readonly) # type: ignore 169 170 def evals(self, readonly: bool = False) -> list[Eval]: 171 return super().evals(readonly=readonly) # type: ignore 172 173 def run_configs(self, readonly: bool = False) -> list[TaskRunConfig]: 174 return super().run_configs(readonly=readonly) # type: ignore 175 176 # Workaround to return typed parent without importing Task 177 def parent_project(self) -> Union["Project", None]: 178 if self.parent is None or self.parent.__class__.__name__ != "Project": 179 return None 180 return self.parent # type: ignore
Represents a specific task to be performed, with associated requirements and validation rules.
Contains the task definition, requirements, input/output schemas, and maintains a collection of task runs.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
695 def child_method(self, readonly: bool = False) -> list[child_class]: 696 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
322class TaskOutput(KilnBaseModel): 323 """ 324 An output for a specific task run. 325 326 Contains the actual output content, its source (human or synthetic), 327 and optional rating information. 328 """ 329 330 output: str = Field( 331 description="The output of the task. JSON formatted for structured output, plaintext for unstructured output." 332 ) 333 source: DataSource | None = Field( 334 description="The source of the output: human or synthetic.", 335 default=None, 336 ) 337 rating: TaskOutputRating | None = Field( 338 default=None, description="The rating of the output" 339 ) 340 341 def validate_output_format(self, task: "Task") -> Self: 342 # validate output 343 if task.output_json_schema is not None: 344 try: 345 output_parsed = json.loads(self.output) 346 except json.JSONDecodeError: 347 raise ValueError("Output is not a valid JSON object") 348 349 validate_schema_with_value_error( 350 output_parsed, 351 task.output_json_schema, 352 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 353 ) 354 return self 355 356 @model_validator(mode="after") 357 def validate_output_source(self, info: ValidationInfo) -> Self: 358 # On strict mode and not loaded from file, we validate output_source is not None. 359 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 360 if not strict_mode(): 361 return self 362 if self.loaded_from_file(info): 363 return self 364 if self.source is None: 365 raise ValueError("Output source is required when strict mode is enabled") 366 return self
An output for a specific task run.
Contains the actual output content, its source (human or synthetic), and optional rating information.
341 def validate_output_format(self, task: "Task") -> Self: 342 # validate output 343 if task.output_json_schema is not None: 344 try: 345 output_parsed = json.loads(self.output) 346 except json.JSONDecodeError: 347 raise ValueError("Output is not a valid JSON object") 348 349 validate_schema_with_value_error( 350 output_parsed, 351 task.output_json_schema, 352 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 353 ) 354 return self
356 @model_validator(mode="after") 357 def validate_output_source(self, info: ValidationInfo) -> Self: 358 # On strict mode and not loaded from file, we validate output_source is not None. 359 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 360 if not strict_mode(): 361 return self 362 if self.loaded_from_file(info): 363 return self 364 if self.source is None: 365 raise ValueError("Output source is required when strict mode is enabled") 366 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
50class TaskOutputRating(KilnBaseModel): 51 """ 52 A rating for a task output, including an overall rating and ratings for each requirement. 53 54 Supports: 55 - five_star: 1-5 star ratings 56 - pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail) 57 - pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail) 58 """ 59 60 type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star) 61 value: float | None = Field( 62 description="The rating value. Interpretation depends on rating type:\n- five_star: 1-5 stars\n- pass_fail: 1.0 (pass) or 0.0 (fail)\n- pass_fail_critical: 1.0 (pass), 0.0 (fail), or -1.0 (critical fail)", 63 default=None, 64 ) 65 requirement_ratings: Dict[ID_TYPE, RequirementRating] = Field( 66 default={}, 67 description="The ratings of the requirements of the task. The ID can be either a task_requirement_id or a named rating for an eval_output_score name (in format 'named::<name>').", 68 ) 69 70 # Previously we stored rating values as a dict of floats, but now we store them as RequirementRating objects. 71 @model_validator(mode="before") 72 def upgrade_old_format(cls, data: dict) -> dict: 73 if not isinstance(data, dict): 74 return data 75 76 # Check if we have the old format (dict of floats) 77 req_ratings = data.get("requirement_ratings", {}) 78 if req_ratings and all( 79 isinstance(v, (int, float)) for v in req_ratings.values() 80 ): 81 # Convert each float to a RequirementRating object 82 # all ratings are five star at the point we used this format 83 data["requirement_ratings"] = { 84 k: {"value": v, "type": TaskOutputRatingType.five_star} 85 for k, v in req_ratings.items() 86 } 87 88 return data 89 90 # Used to select high quality outputs for example selection (MultiShotPromptBuilder, etc) 91 def is_high_quality(self) -> bool: 92 if self.value is None: 93 return False 94 95 if self.type == TaskOutputRatingType.five_star: 96 return self.value >= 4 97 elif self.type == TaskOutputRatingType.pass_fail: 98 return self.value == 1.0 99 elif self.type == TaskOutputRatingType.pass_fail_critical: 100 return self.value == 1.0 101 return False 102 103 @model_validator(mode="after") 104 def validate_rating(self) -> Self: 105 if self.type not in TaskOutputRatingType: 106 raise ValueError(f"Invalid rating type: {self.type}") 107 108 # Overall rating is optional 109 if self.value is not None: 110 self._validate_rating(self.type, self.value, "overall rating") 111 112 for req_id, req_rating in self.requirement_ratings.items(): 113 self._validate_rating( 114 req_rating.type, 115 req_rating.value, 116 f"requirement rating for req ID: {req_id}", 117 ) 118 119 return self 120 121 def _validate_rating( 122 self, type: TaskOutputRatingType, rating: float | None, rating_name: str 123 ) -> None: 124 if type == TaskOutputRatingType.five_star: 125 self._validate_five_star(rating, rating_name) 126 elif type == TaskOutputRatingType.pass_fail: 127 self._validate_pass_fail(rating, rating_name) 128 elif type == TaskOutputRatingType.pass_fail_critical: 129 self._validate_pass_fail_critical(rating, rating_name) 130 131 def _validate_five_star(self, rating: float | None, rating_name: str) -> None: 132 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 133 raise ValueError( 134 f"{rating_name.capitalize()} of type five_star must be an integer value (1-5)" 135 ) 136 if rating < 1 or rating > 5: 137 raise ValueError( 138 f"{rating_name.capitalize()} of type five_star must be between 1 and 5 stars" 139 ) 140 141 def _validate_pass_fail(self, rating: float | None, rating_name: str) -> None: 142 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 143 raise ValueError( 144 f"{rating_name.capitalize()} of type pass_fail must be an integer value (0 or 1)" 145 ) 146 if rating not in [0, 1]: 147 raise ValueError( 148 f"{rating_name.capitalize()} of type pass_fail must be 0 (fail) or 1 (pass)" 149 ) 150 151 def _validate_pass_fail_critical( 152 self, rating: float | None, rating_name: str 153 ) -> None: 154 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 155 raise ValueError( 156 f"{rating_name.capitalize()} of type pass_fail_critical must be an integer value (-1, 0, or 1)" 157 ) 158 if rating not in [-1, 0, 1]: 159 raise ValueError( 160 f"{rating_name.capitalize()} of type pass_fail_critical must be -1 (critical fail), 0 (fail), or 1 (pass)" 161 )
A rating for a task output, including an overall rating and ratings for each requirement.
Supports:
- five_star: 1-5 star ratings
- pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
- pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
71 @model_validator(mode="before") 72 def upgrade_old_format(cls, data: dict) -> dict: 73 if not isinstance(data, dict): 74 return data 75 76 # Check if we have the old format (dict of floats) 77 req_ratings = data.get("requirement_ratings", {}) 78 if req_ratings and all( 79 isinstance(v, (int, float)) for v in req_ratings.values() 80 ): 81 # Convert each float to a RequirementRating object 82 # all ratings are five star at the point we used this format 83 data["requirement_ratings"] = { 84 k: {"value": v, "type": TaskOutputRatingType.five_star} 85 for k, v in req_ratings.items() 86 } 87 88 return data
91 def is_high_quality(self) -> bool: 92 if self.value is None: 93 return False 94 95 if self.type == TaskOutputRatingType.five_star: 96 return self.value >= 4 97 elif self.type == TaskOutputRatingType.pass_fail: 98 return self.value == 1.0 99 elif self.type == TaskOutputRatingType.pass_fail_critical: 100 return self.value == 1.0 101 return False
103 @model_validator(mode="after") 104 def validate_rating(self) -> Self: 105 if self.type not in TaskOutputRatingType: 106 raise ValueError(f"Invalid rating type: {self.type}") 107 108 # Overall rating is optional 109 if self.value is not None: 110 self._validate_rating(self.type, self.value, "overall rating") 111 112 for req_id, req_rating in self.requirement_ratings.items(): 113 self._validate_rating( 114 req_rating.type, 115 req_rating.value, 116 f"requirement rating for req ID: {req_id}", 117 ) 118 119 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
19class TaskOutputRatingType(str, Enum): 20 """Defines the types of rating systems available for task outputs.""" 21 22 five_star = "five_star" 23 pass_fail = "pass_fail" 24 pass_fail_critical = "pass_fail_critical" 25 custom = "custom"
Defines the types of rating systems available for task outputs.
35class TaskRequirement(BaseModel): 36 """ 37 Defines a specific requirement that should be met by task outputs. 38 39 Includes an identifier, name, description, instruction for meeting the requirement, 40 priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom). 41 """ 42 43 id: ID_TYPE = ID_FIELD 44 name: FilenameStringShort = Field(description="The name of the task requirement.") 45 description: str | None = Field(default=None) 46 instruction: str = Field(min_length=1) 47 priority: Priority = Field(default=Priority.p2) 48 type: TaskOutputRatingType = Field(default=TaskOutputRatingType.five_star)
Defines a specific requirement that should be met by task outputs.
Includes an identifier, name, description, instruction for meeting the requirement, priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).
77class TaskRun(KilnParentedModel): 78 """ 79 Represents a single execution of a Task. 80 81 Contains the input used, its source, the output produced, and optional 82 repair information if the output needed correction. 83 """ 84 85 input: str = Field( 86 description="The inputs to the task. JSON formatted for structured input, plaintext for unstructured input." 87 ) 88 input_source: DataSource | None = Field( 89 default=None, description="The source of the input: human or synthetic." 90 ) 91 92 output: TaskOutput = Field(description="The output of the task run.") 93 repair_instructions: str | None = Field( 94 default=None, 95 description="Instructions for fixing the output. Should define what is wrong, and how to fix it. Will be used by models for both generating a fixed output, and evaluating future models.", 96 ) 97 repaired_output: TaskOutput | None = Field( 98 default=None, 99 description="An version of the output with issues fixed. This must be a 'fixed' version of the existing output, and not an entirely new output. If you wish to generate an ideal curatorial output for this task unrelated to this output, generate a new TaskOutput with type 'human' instead of using this field.", 100 ) 101 intermediate_outputs: Dict[str, str] | None = Field( 102 default=None, 103 description="Intermediate outputs from the task run. Keys are the names of the intermediate output steps (cot=chain of thought, etc), values are the output data.", 104 ) 105 tags: List[str] = Field( 106 default=[], 107 description="Tags for the task run. Tags are used to categorize task runs for filtering and reporting.", 108 ) 109 usage: Usage | None = Field( 110 default=None, 111 description="Usage information for the task run. This includes the number of input tokens, output tokens, and total tokens used.", 112 ) 113 trace: list[ChatCompletionMessageParam] | None = Field( 114 default=None, 115 description="The trace of the task run in OpenAI format. This is the list of messages that were sent to/from the model.", 116 ) 117 118 def thinking_training_data(self) -> str | None: 119 """ 120 Get the thinking training data from the task run. 121 """ 122 if self.intermediate_outputs is None: 123 return None 124 return self.intermediate_outputs.get( 125 "reasoning" 126 ) or self.intermediate_outputs.get("chain_of_thought") 127 128 def has_thinking_training_data(self) -> bool: 129 """ 130 Does this run have thinking data that we can use to train a thinking model? 131 """ 132 return self.thinking_training_data() is not None 133 134 # Workaround to return typed parent without importing Task 135 def parent_task(self) -> Union["Task", None]: 136 if self.parent is None or self.parent.__class__.__name__ != "Task": 137 return None 138 return self.parent # type: ignore 139 140 @model_validator(mode="after") 141 def validate_input_format(self, info: ValidationInfo) -> Self: 142 # Don't validate if loading from file (not new). Too slow. 143 # We don't allow changing task schema, so this is redundant validation. 144 # Note: we still validate if editing a loaded model 145 if self.loading_from_file(info): 146 # Consider loading an existing model as validated. 147 self._last_validated_input = self.input 148 return self 149 150 # Don't validate if input has not changed. Too slow to run this every time. 151 if ( 152 hasattr(self, "_last_validated_input") 153 and self.input == self._last_validated_input 154 ): 155 return self 156 157 task = self.parent_task() 158 if task is None: 159 # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving) 160 return self 161 162 # validate input 163 if task.input_json_schema is not None: 164 try: 165 input_parsed = json.loads(self.input) 166 except json.JSONDecodeError: 167 raise ValueError("Input is not a valid JSON object") 168 169 validate_schema_with_value_error( 170 input_parsed, 171 task.input_json_schema, 172 "Input does not match task input schema.", 173 require_object=False, 174 ) 175 176 self._last_validated_input = self.input 177 return self 178 179 @model_validator(mode="after") 180 def validate_output_format(self, info: ValidationInfo) -> Self: 181 # Don't validate if loading from file (not new). Too slow. 182 # Note: we still validate if editing a loaded model's output. 183 if self.loading_from_file(info): 184 # Consider loading an existing model as validated. 185 self._last_validated_output = self.output.output if self.output else None 186 return self 187 188 # Don't validate unless output has changed since last validation. 189 # The validator is slow and costly, don't want it running when setting other fields. 190 if ( 191 hasattr(self, "_last_validated_output") 192 and self.output is not None 193 and self.output.output == self._last_validated_output 194 ): 195 return self 196 197 task = self.parent_task() 198 if task is None: 199 return self 200 201 self.output.validate_output_format(task) 202 self._last_validated_output = self.output.output if self.output else None 203 return self 204 205 @model_validator(mode="after") 206 def validate_repaired_output(self) -> Self: 207 if self.repaired_output is not None: 208 if self.repaired_output.rating is not None: 209 raise ValueError( 210 "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed." 211 ) 212 213 task = self.parent_task() 214 if ( 215 task is not None 216 and self.repaired_output.output is not None 217 and task.output_json_schema is not None 218 ): 219 try: 220 output_parsed = json.loads(self.repaired_output.output) 221 except json.JSONDecodeError: 222 raise ValueError("Repaired output is not a valid JSON object") 223 224 validate_schema_with_value_error( 225 output_parsed, 226 task.output_json_schema, 227 "Repaired output does not match task output schema.", 228 ) 229 230 if self.repair_instructions is None and self.repaired_output is not None: 231 raise ValueError( 232 "Repair instructions are required if providing a repaired output." 233 ) 234 if self.repair_instructions is not None and self.repaired_output is None: 235 raise ValueError( 236 "A repaired output is required if providing repair instructions." 237 ) 238 239 return self 240 241 @model_validator(mode="after") 242 def validate_input_source(self, info: ValidationInfo) -> Self: 243 # On strict mode and not loaded from file, we validate input_source is not None. 244 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 245 if not strict_mode(): 246 return self 247 if self.loaded_from_file(info): 248 return self 249 if self.input_source is None: 250 raise ValueError("input_source is required when strict mode is enabled") 251 return self 252 253 @model_validator(mode="after") 254 def validate_tags(self) -> Self: 255 for tag in self.tags: 256 if not tag: 257 raise ValueError("Tags cannot be empty strings") 258 if " " in tag: 259 raise ValueError("Tags cannot contain spaces. Try underscores.") 260 261 return self
Represents a single execution of a Task.
Contains the input used, its source, the output produced, and optional repair information if the output needed correction.
118 def thinking_training_data(self) -> str | None: 119 """ 120 Get the thinking training data from the task run. 121 """ 122 if self.intermediate_outputs is None: 123 return None 124 return self.intermediate_outputs.get( 125 "reasoning" 126 ) or self.intermediate_outputs.get("chain_of_thought")
Get the thinking training data from the task run.
128 def has_thinking_training_data(self) -> bool: 129 """ 130 Does this run have thinking data that we can use to train a thinking model? 131 """ 132 return self.thinking_training_data() is not None
Does this run have thinking data that we can use to train a thinking model?
140 @model_validator(mode="after") 141 def validate_input_format(self, info: ValidationInfo) -> Self: 142 # Don't validate if loading from file (not new). Too slow. 143 # We don't allow changing task schema, so this is redundant validation. 144 # Note: we still validate if editing a loaded model 145 if self.loading_from_file(info): 146 # Consider loading an existing model as validated. 147 self._last_validated_input = self.input 148 return self 149 150 # Don't validate if input has not changed. Too slow to run this every time. 151 if ( 152 hasattr(self, "_last_validated_input") 153 and self.input == self._last_validated_input 154 ): 155 return self 156 157 task = self.parent_task() 158 if task is None: 159 # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving) 160 return self 161 162 # validate input 163 if task.input_json_schema is not None: 164 try: 165 input_parsed = json.loads(self.input) 166 except json.JSONDecodeError: 167 raise ValueError("Input is not a valid JSON object") 168 169 validate_schema_with_value_error( 170 input_parsed, 171 task.input_json_schema, 172 "Input does not match task input schema.", 173 require_object=False, 174 ) 175 176 self._last_validated_input = self.input 177 return self
179 @model_validator(mode="after") 180 def validate_output_format(self, info: ValidationInfo) -> Self: 181 # Don't validate if loading from file (not new). Too slow. 182 # Note: we still validate if editing a loaded model's output. 183 if self.loading_from_file(info): 184 # Consider loading an existing model as validated. 185 self._last_validated_output = self.output.output if self.output else None 186 return self 187 188 # Don't validate unless output has changed since last validation. 189 # The validator is slow and costly, don't want it running when setting other fields. 190 if ( 191 hasattr(self, "_last_validated_output") 192 and self.output is not None 193 and self.output.output == self._last_validated_output 194 ): 195 return self 196 197 task = self.parent_task() 198 if task is None: 199 return self 200 201 self.output.validate_output_format(task) 202 self._last_validated_output = self.output.output if self.output else None 203 return self
205 @model_validator(mode="after") 206 def validate_repaired_output(self) -> Self: 207 if self.repaired_output is not None: 208 if self.repaired_output.rating is not None: 209 raise ValueError( 210 "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed." 211 ) 212 213 task = self.parent_task() 214 if ( 215 task is not None 216 and self.repaired_output.output is not None 217 and task.output_json_schema is not None 218 ): 219 try: 220 output_parsed = json.loads(self.repaired_output.output) 221 except json.JSONDecodeError: 222 raise ValueError("Repaired output is not a valid JSON object") 223 224 validate_schema_with_value_error( 225 output_parsed, 226 task.output_json_schema, 227 "Repaired output does not match task output schema.", 228 ) 229 230 if self.repair_instructions is None and self.repaired_output is not None: 231 raise ValueError( 232 "Repair instructions are required if providing a repaired output." 233 ) 234 if self.repair_instructions is not None and self.repaired_output is None: 235 raise ValueError( 236 "A repaired output is required if providing repair instructions." 237 ) 238 239 return self
241 @model_validator(mode="after") 242 def validate_input_source(self, info: ValidationInfo) -> Self: 243 # On strict mode and not loaded from file, we validate input_source is not None. 244 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 245 if not strict_mode(): 246 return self 247 if self.loaded_from_file(info): 248 return self 249 if self.input_source is None: 250 raise ValueError("input_source is required when strict mode is enabled") 251 return self
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
18class Usage(BaseModel): 19 input_tokens: int | None = Field( 20 default=None, 21 description="The number of input tokens used in the task run.", 22 ge=0, 23 ) 24 output_tokens: int | None = Field( 25 default=None, 26 description="The number of output tokens used in the task run.", 27 ge=0, 28 ) 29 total_tokens: int | None = Field( 30 default=None, 31 description="The total number of tokens used in the task run.", 32 ge=0, 33 ) 34 cost: float | None = Field( 35 default=None, 36 description="The cost of the task run in US dollars, saved at runtime (prices can change over time).", 37 ge=0, 38 ) 39 40 def __add__(self, other: "Usage") -> "Usage": 41 """Add two Usage objects together, handling None values gracefully. 42 43 None + None = None 44 None + value = value 45 value + None = value 46 value1 + value2 = value1 + value2 47 """ 48 if not isinstance(other, Usage): 49 raise TypeError(f"Cannot add Usage with {type(other).__name__}") 50 51 def _add_optional_int(a: int | None, b: int | None) -> int | None: 52 if a is None and b is None: 53 return None 54 if a is None: 55 return b 56 if b is None: 57 return a 58 return a + b 59 60 def _add_optional_float(a: float | None, b: float | None) -> float | None: 61 if a is None and b is None: 62 return None 63 if a is None: 64 return b 65 if b is None: 66 return a 67 return a + b 68 69 return Usage( 70 input_tokens=_add_optional_int(self.input_tokens, other.input_tokens), 71 output_tokens=_add_optional_int(self.output_tokens, other.output_tokens), 72 total_tokens=_add_optional_int(self.total_tokens, other.total_tokens), 73 cost=_add_optional_float(self.cost, other.cost), 74 )
!!! abstract "Usage Documentation" Models
A base class for creating Pydantic models.
Attributes:
__class_vars__: The names of the class variables defined on the model.
__private_attributes__: Metadata about the private attributes of the model.
__signature__: The synthesized __init__ [Signature][inspect.Signature] of the model.
__pydantic_complete__: Whether model building is completed, or if there are still undefined fields.
__pydantic_core_schema__: The core schema of the model.
__pydantic_custom_init__: Whether the model has a custom `__init__` function.
__pydantic_decorators__: Metadata containing the decorators defined on the model.
This replaces `Model.__validators__` and `Model.__root_validators__` from Pydantic V1.
__pydantic_generic_metadata__: Metadata for generic models; contains data used for a similar purpose to
__args__, __origin__, __parameters__ in typing-module generics. May eventually be replaced by these.
__pydantic_parent_namespace__: Parent namespace of the model, used for automatic rebuilding of models.
__pydantic_post_init__: The name of the post-init method for the model, if defined.
__pydantic_root_model__: Whether the model is a [`RootModel`][pydantic.root_model.RootModel].
__pydantic_serializer__: The `pydantic-core` `SchemaSerializer` used to dump instances of the model.
__pydantic_validator__: The `pydantic-core` `SchemaValidator` used to validate instances of the model.
__pydantic_fields__: A dictionary of field names and their corresponding [`FieldInfo`][pydantic.fields.FieldInfo] objects.
__pydantic_computed_fields__: A dictionary of computed field names and their corresponding [`ComputedFieldInfo`][pydantic.fields.ComputedFieldInfo] objects.
__pydantic_extra__: A dictionary containing extra values, if [`extra`][pydantic.config.ConfigDict.extra]
is set to `'allow'`.
__pydantic_fields_set__: The names of fields explicitly set during instantiation.
__pydantic_private__: Values of private attributes set on the model instance.