kiln_ai.datamodel
See our docs for details about our datamodel classes and hierarchy:
Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html
1""" 2See our docs for details about our datamodel classes and hierarchy: 3 4Developer docs: https://kiln-ai.github.io/Kiln/kiln_core_docs/kiln_ai.html 5 6User docs: https://docs.kiln.tech/developers/kiln-datamodel 7""" 8 9# This component uses "flat" imports so we don't have too much internal structure exposed in the API. 10# for example you can just `from datamodel import Task, Project` instead of `from datamodel.task import Task; from datamodel.project import Project` 11 12from __future__ import annotations 13 14from kiln_ai.datamodel import ( 15 chunk, 16 dataset_split, 17 embedding, 18 eval, 19 extraction, 20 rag, 21 reranker, 22 strict_mode, 23) 24from kiln_ai.datamodel.basemodel import generate_model_id 25from kiln_ai.datamodel.datamodel_enums import ( 26 FeedbackSource, 27 FineTuneStatusType, 28 Priority, 29 StructuredOutputMode, 30 TaskOutputRatingType, 31) 32from kiln_ai.datamodel.dataset_split import DatasetSplit, DatasetSplitDefinition 33from kiln_ai.datamodel.external_tool_server import ExternalToolServer 34from kiln_ai.datamodel.feedback import Feedback 35from kiln_ai.datamodel.finetune import Finetune 36from kiln_ai.datamodel.project import Project 37from kiln_ai.datamodel.prompt import BasePrompt, Prompt 38from kiln_ai.datamodel.prompt_id import ( 39 PromptGenerators, 40 PromptId, 41 prompt_generator_values, 42) 43from kiln_ai.datamodel.prompt_optimization_job import PromptOptimizationJob 44from kiln_ai.datamodel.skill import Skill 45from kiln_ai.datamodel.task import Task, TaskRequirement 46from kiln_ai.datamodel.task_output import ( 47 DataSource, 48 DataSourceProperty, 49 DataSourceType, 50 RequirementRating, 51 TaskOutput, 52 TaskOutputRating, 53) 54from kiln_ai.datamodel.task_run import TaskRun, Usage 55 56__all__ = [ 57 "BasePrompt", 58 "DataSource", 59 "DataSourceProperty", 60 "DataSourceType", 61 "DatasetSplit", 62 "DatasetSplitDefinition", 63 "ExternalToolServer", 64 "Feedback", 65 "FeedbackSource", 66 "FineTuneStatusType", 67 "Finetune", 68 "Priority", 69 "Project", 70 "Prompt", 71 "PromptGenerators", 72 "PromptId", 73 "PromptOptimizationJob", 74 "RequirementRating", 75 "Skill", 76 "StructuredOutputMode", 77 "Task", 78 "TaskOutput", 79 "TaskOutputRating", 80 "TaskOutputRatingType", 81 "TaskRequirement", 82 "TaskRun", 83 "Usage", 84 "chunk", 85 "dataset_split", 86 "embedding", 87 "eval", 88 "extraction", 89 "generate_model_id", 90 "prompt_generator_values", 91 "rag", 92 "reranker", 93 "strict_mode", 94]
7class BasePrompt(BaseModel): 8 """ 9 A prompt for a task. This is the basic data storage format which can be used throughout a project. 10 11 The "Prompt" model name is reserved for the custom prompts parented by a task. 12 """ 13 14 name: FilenameString = Field(description="The name of the prompt.") 15 description: str | None = Field( 16 default=None, 17 description="A more detailed description of the prompt.", 18 ) 19 generator_id: str | None = Field( 20 default=None, 21 description="The id of the generator that created this prompt.", 22 ) 23 prompt: str = Field( 24 description="The prompt for the task.", 25 min_length=1, 26 ) 27 chain_of_thought_instructions: str | None = Field( 28 default=None, 29 description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting. COT will not be used unless this is provided.", 30 )
A prompt for a task. This is the basic data storage format which can be used throughout a project.
The "Prompt" model name is reserved for the custom prompts parented by a task.
195class DataSource(BaseModel): 196 """ 197 Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties. 198 199 Properties vary based on the source type - for synthetic/tool_call sources this includes 200 model information, for human sources this includes creator information, for file imports 201 this includes file information. 202 """ 203 204 type: DataSourceType = Field(description="The type of data source.") 205 properties: Dict[str, str | int | float] = Field( 206 default={}, 207 description="Properties describing the data source. For synthetic things like model. For human: the human's name. For file_import: file information.", 208 ) 209 run_config: Optional[RunConfigProperties] = Field( 210 default=None, 211 description="The run config used to generate the data, if generated by a running a model in Kiln (only true for type=synthetic).", 212 ) 213 214 _data_source_properties = [ 215 DataSourceProperty( 216 name="created_by", 217 type=str, 218 required_for=[DataSourceType.human], 219 not_allowed_for=[ 220 DataSourceType.synthetic, 221 DataSourceType.file_import, 222 DataSourceType.tool_call, 223 ], 224 ), 225 DataSourceProperty( 226 name="model_name", 227 type=str, 228 required_for=[DataSourceType.synthetic], 229 not_allowed_for=[ 230 DataSourceType.human, 231 DataSourceType.file_import, 232 DataSourceType.tool_call, 233 ], 234 ), 235 DataSourceProperty( 236 name="model_provider", 237 type=str, 238 required_for=[DataSourceType.synthetic], 239 not_allowed_for=[ 240 DataSourceType.human, 241 DataSourceType.file_import, 242 DataSourceType.tool_call, 243 ], 244 ), 245 DataSourceProperty( 246 name="adapter_name", 247 type=str, 248 required_for=[DataSourceType.synthetic], 249 not_allowed_for=[ 250 DataSourceType.human, 251 DataSourceType.file_import, 252 DataSourceType.tool_call, 253 ], 254 ), 255 DataSourceProperty( 256 # Legacy field -- allow loading from old runs, but we shouldn't be setting it. 257 name="prompt_builder_name", 258 type=str, 259 not_allowed_for=[ 260 DataSourceType.human, 261 DataSourceType.file_import, 262 DataSourceType.tool_call, 263 ], 264 ), 265 DataSourceProperty( 266 # The PromptId of the prompt. Can be a saved prompt, fine-tune, generator name, etc. See PromptId type for more details. 267 name="prompt_id", 268 type=str, 269 not_allowed_for=[ 270 DataSourceType.human, 271 DataSourceType.file_import, 272 DataSourceType.tool_call, 273 ], 274 ), 275 DataSourceProperty( 276 name="file_name", 277 type=str, 278 required_for=[DataSourceType.file_import], 279 not_allowed_for=[ 280 DataSourceType.human, 281 DataSourceType.synthetic, 282 DataSourceType.tool_call, 283 ], 284 ), 285 ] 286 287 @model_validator(mode="after") 288 def validate_type(self) -> "DataSource": 289 if self.type not in DataSourceType: 290 raise ValueError(f"Invalid data source type: {self.type}") 291 return self 292 293 @model_validator(mode="after") 294 def validate_properties(self) -> "DataSource": 295 for prop in self._data_source_properties: 296 # Check the property type is correct 297 if prop.name in self.properties: 298 if not isinstance(self.properties[prop.name], prop.type): 299 raise ValueError( 300 f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source" 301 ) 302 # Check the property is required for the data source type 303 if self.type in prop.required_for: 304 if prop.name not in self.properties: 305 raise ValueError( 306 f"'{prop.name}' is required for {self.type} data source" 307 ) 308 # Check the property is not allowed for the data source type 309 elif self.type in prop.not_allowed_for and prop.name in self.properties: 310 raise ValueError( 311 f"'{prop.name}' is not allowed for {self.type} data source" 312 ) 313 return self 314 315 @model_validator(mode="after") 316 def validate_no_empty_properties(self) -> Self: 317 for prop, value in self.properties.items(): 318 if isinstance(value, str) and value == "": 319 raise ValueError( 320 f"Property '{prop}' must be a non-empty string for {self.type} data source" 321 ) 322 return self
Represents the origin of data, either human, synthetic, file import, or tool call, with associated properties.
Properties vary based on the source type - for synthetic/tool_call sources this includes model information, for human sources this includes creator information, for file imports this includes file information.
293 @model_validator(mode="after") 294 def validate_properties(self) -> "DataSource": 295 for prop in self._data_source_properties: 296 # Check the property type is correct 297 if prop.name in self.properties: 298 if not isinstance(self.properties[prop.name], prop.type): 299 raise ValueError( 300 f"'{prop.name}' must be of type {prop.type.__name__} for {self.type} data source" 301 ) 302 # Check the property is required for the data source type 303 if self.type in prop.required_for: 304 if prop.name not in self.properties: 305 raise ValueError( 306 f"'{prop.name}' is required for {self.type} data source" 307 ) 308 # Check the property is not allowed for the data source type 309 elif self.type in prop.not_allowed_for and prop.name in self.properties: 310 raise ValueError( 311 f"'{prop.name}' is not allowed for {self.type} data source" 312 ) 313 return self
315 @model_validator(mode="after") 316 def validate_no_empty_properties(self) -> Self: 317 for prop, value in self.properties.items(): 318 if isinstance(value, str) and value == "": 319 raise ValueError( 320 f"Property '{prop}' must be a non-empty string for {self.type} data source" 321 ) 322 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
181class DataSourceProperty(BaseModel): 182 """ 183 Defines a property that can be associated with a data source. 184 185 Includes validation rules for when properties are required or not allowed 186 based on the data source type. 187 """ 188 189 name: str 190 type: Type[Union[str, int, float]] 191 required_for: List[DataSourceType] = [] 192 not_allowed_for: List[DataSourceType] = []
Defines a property that can be associated with a data source.
Includes validation rules for when properties are required or not allowed based on the data source type.
167class DataSourceType(str, Enum): 168 """ 169 The source type of a piece of data. 170 171 Human: a human created the data 172 Synthetic: a model created the data 173 """ 174 175 human = "human" 176 synthetic = "synthetic" 177 file_import = "file_import" 178 tool_call = "tool_call"
The source type of a piece of data.
Human: a human created the data Synthetic: a model created the data
83class DatasetSplit(KilnParentedModel): 84 """ 85 A collection of task runs, with optional splits (train, test, validation). 86 87 Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks. 88 89 Maintains a list of IDs for each split, to avoid data duplication. 90 """ 91 92 name: FilenameString = Field(description="The name of the dataset split.") 93 description: str | None = Field( 94 default=None, 95 description="A description of the dataset for you and your team. Not used in training.", 96 ) 97 splits: list[DatasetSplitDefinition] = Field( 98 default_factory=list, 99 description="The splits in the dataset.", 100 ) 101 split_contents: dict[str, list[str]] = Field( 102 description="The contents of each split in the dataset. The key is the split name, and the value is a list of task run IDs.", 103 ) 104 filter: DatasetFilterId | None = Field( 105 default=None, 106 description="The filter used to build the dataset.", 107 ) 108 109 @model_validator(mode="after") 110 def validate_split_percentages(self) -> "DatasetSplit": 111 total = sum(split.percentage for split in self.splits) 112 if not math.isclose(total, 1.0, rel_tol=1e-9): 113 raise ValueError(f"The sum of split percentages must be 1.0 (got {total})") 114 return self 115 116 @classmethod 117 def from_task( 118 cls, 119 name: str, 120 task: "Task", 121 splits: list[DatasetSplitDefinition], 122 filter_id: DatasetFilterId = "all", 123 description: str | None = None, 124 ): 125 """ 126 Build a dataset split from a task. 127 """ 128 filter = dataset_filter_from_id(filter_id) 129 split_contents = cls.build_split_contents(task, splits, filter) 130 return cls( 131 parent=task, 132 name=name, 133 description=description, 134 splits=splits, 135 split_contents=split_contents, 136 filter=filter_id, 137 ) 138 139 @classmethod 140 def build_split_contents( 141 cls, 142 task: "Task", 143 splits: list[DatasetSplitDefinition], 144 filter: DatasetFilter, 145 ) -> dict[str, list[str]]: 146 valid_ids = [] 147 for task_run in task.runs(): 148 if filter(task_run): 149 valid_ids.append(task_run.id) 150 151 # Shuffle and split by split percentage 152 random.shuffle(valid_ids) 153 split_contents = {} 154 start_idx = 0 155 remaining_items = len(valid_ids) 156 157 # Handle all splits except the last one 158 for split in splits[:-1]: 159 split_size = round(len(valid_ids) * split.percentage) 160 split_contents[split.name] = valid_ids[start_idx : start_idx + split_size] 161 start_idx += split_size 162 remaining_items -= split_size 163 164 # Last split gets all remaining items (for rounding) 165 if splits: 166 split_contents[splits[-1].name] = valid_ids[start_idx:] 167 168 return split_contents 169 170 def parent_task(self) -> "Task | None": 171 # inline import to avoid circular import 172 from kiln_ai.datamodel import Task 173 174 if not isinstance(self.parent, Task): 175 return None 176 return self.parent 177 178 def missing_count(self) -> int: 179 """ 180 Returns: 181 int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset 182 """ 183 parent = self.parent_task() 184 if parent is None: 185 raise ValueError("DatasetSplit has no parent task") 186 187 runs = parent.runs(readonly=True) 188 all_ids = set(run.id for run in runs) 189 all_ids_in_splits = set() 190 for ids in self.split_contents.values(): 191 all_ids_in_splits.update(ids) 192 missing = all_ids_in_splits - all_ids 193 return len(missing) 194 195 def _get_runs(self) -> list[TaskRun]: 196 """ 197 Get all task runs referenced in this dataset split. 198 199 Returns: 200 list[TaskRun]: list of task runs in this dataset split 201 """ 202 parent = self.parent_task() 203 if parent is None: 204 return [] 205 206 runs = [] 207 all_run_ids = set() 208 for run_ids in self.split_contents.values(): 209 all_run_ids.update(run_ids) 210 211 # Find all runs by their IDs 212 for task_run in parent.runs(readonly=True): 213 if task_run.id in all_run_ids: 214 runs.append(task_run) 215 216 return runs 217 218 @staticmethod 219 def compute_tool_info(runs: list[TaskRun]) -> DatasetToolInfo: 220 """ 221 Compute tool info from a list of task runs. 222 223 Args: 224 runs: list of task runs to analyze 225 226 Returns: 227 DatasetToolInfo: information about tools used across the task runs 228 """ 229 230 has_tool_mismatch = False 231 tools: set[str] | None = None 232 233 for run in runs: 234 # Extract tools from run config, treating missing source/run_config/tools_config as empty tools 235 run_tools: set[str] = set() 236 source = run.output.source if run.output else None 237 if source is not None and isinstance( 238 source.run_config, KilnAgentRunConfigProperties 239 ): 240 tools_config = source.run_config.tools_config 241 if tools_config is not None: 242 run_tools = set(tools_config.tools) 243 244 # First run establishes the expected tool set (including empty) 245 if tools is None: 246 tools = run_tools 247 elif run_tools != tools: 248 # Mismatch found 249 has_tool_mismatch = True 250 tools = None 251 break 252 253 # If no valid runs were processed, return empty tools 254 if tools is None: 255 if not has_tool_mismatch: 256 tools = set() 257 258 return DatasetToolInfo( 259 has_tool_mismatch=has_tool_mismatch, 260 tools=None if tools is None else sorted(tools), 261 ) 262 263 def tool_info(self) -> DatasetToolInfo: 264 """ 265 Helper method to compute tool info for the dataset split. Iterate through all runs in the dataset split and check the tools used in each run config. 266 267 Returns: 268 DatasetToolInfo: information about tools used across task runs in this dataset split 269 """ 270 runs = self._get_runs() 271 tool_info = self.compute_tool_info(runs) 272 return tool_info
A collection of task runs, with optional splits (train, test, validation).
Used to freeze a dataset into train/test/validation splits for repeatable fine-tuning or other tasks.
Maintains a list of IDs for each split, to avoid data duplication.
109 @model_validator(mode="after") 110 def validate_split_percentages(self) -> "DatasetSplit": 111 total = sum(split.percentage for split in self.splits) 112 if not math.isclose(total, 1.0, rel_tol=1e-9): 113 raise ValueError(f"The sum of split percentages must be 1.0 (got {total})") 114 return self
116 @classmethod 117 def from_task( 118 cls, 119 name: str, 120 task: "Task", 121 splits: list[DatasetSplitDefinition], 122 filter_id: DatasetFilterId = "all", 123 description: str | None = None, 124 ): 125 """ 126 Build a dataset split from a task. 127 """ 128 filter = dataset_filter_from_id(filter_id) 129 split_contents = cls.build_split_contents(task, splits, filter) 130 return cls( 131 parent=task, 132 name=name, 133 description=description, 134 splits=splits, 135 split_contents=split_contents, 136 filter=filter_id, 137 )
Build a dataset split from a task.
139 @classmethod 140 def build_split_contents( 141 cls, 142 task: "Task", 143 splits: list[DatasetSplitDefinition], 144 filter: DatasetFilter, 145 ) -> dict[str, list[str]]: 146 valid_ids = [] 147 for task_run in task.runs(): 148 if filter(task_run): 149 valid_ids.append(task_run.id) 150 151 # Shuffle and split by split percentage 152 random.shuffle(valid_ids) 153 split_contents = {} 154 start_idx = 0 155 remaining_items = len(valid_ids) 156 157 # Handle all splits except the last one 158 for split in splits[:-1]: 159 split_size = round(len(valid_ids) * split.percentage) 160 split_contents[split.name] = valid_ids[start_idx : start_idx + split_size] 161 start_idx += split_size 162 remaining_items -= split_size 163 164 # Last split gets all remaining items (for rounding) 165 if splits: 166 split_contents[splits[-1].name] = valid_ids[start_idx:] 167 168 return split_contents
178 def missing_count(self) -> int: 179 """ 180 Returns: 181 int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset 182 """ 183 parent = self.parent_task() 184 if parent is None: 185 raise ValueError("DatasetSplit has no parent task") 186 187 runs = parent.runs(readonly=True) 188 all_ids = set(run.id for run in runs) 189 all_ids_in_splits = set() 190 for ids in self.split_contents.values(): 191 all_ids_in_splits.update(ids) 192 missing = all_ids_in_splits - all_ids 193 return len(missing)
Returns: int: the number of task runs that have an ID persisted in this dataset split, but no longer exist in the dataset
218 @staticmethod 219 def compute_tool_info(runs: list[TaskRun]) -> DatasetToolInfo: 220 """ 221 Compute tool info from a list of task runs. 222 223 Args: 224 runs: list of task runs to analyze 225 226 Returns: 227 DatasetToolInfo: information about tools used across the task runs 228 """ 229 230 has_tool_mismatch = False 231 tools: set[str] | None = None 232 233 for run in runs: 234 # Extract tools from run config, treating missing source/run_config/tools_config as empty tools 235 run_tools: set[str] = set() 236 source = run.output.source if run.output else None 237 if source is not None and isinstance( 238 source.run_config, KilnAgentRunConfigProperties 239 ): 240 tools_config = source.run_config.tools_config 241 if tools_config is not None: 242 run_tools = set(tools_config.tools) 243 244 # First run establishes the expected tool set (including empty) 245 if tools is None: 246 tools = run_tools 247 elif run_tools != tools: 248 # Mismatch found 249 has_tool_mismatch = True 250 tools = None 251 break 252 253 # If no valid runs were processed, return empty tools 254 if tools is None: 255 if not has_tool_mismatch: 256 tools = set() 257 258 return DatasetToolInfo( 259 has_tool_mismatch=has_tool_mismatch, 260 tools=None if tools is None else sorted(tools), 261 )
Compute tool info from a list of task runs.
Args: runs: list of task runs to analyze
Returns: DatasetToolInfo: information about tools used across the task runs
263 def tool_info(self) -> DatasetToolInfo: 264 """ 265 Helper method to compute tool info for the dataset split. Iterate through all runs in the dataset split and check the tools used in each run config. 266 267 Returns: 268 DatasetToolInfo: information about tools used across task runs in this dataset split 269 """ 270 runs = self._get_runs() 271 tool_info = self.compute_tool_info(runs) 272 return tool_info
Helper method to compute tool info for the dataset split. Iterate through all runs in the dataset split and check the tools used in each run config.
Returns: DatasetToolInfo: information about tools used across task runs in this dataset split
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
39class DatasetSplitDefinition(BaseModel): 40 """ 41 A definition of a split in a dataset. 42 43 Example: name="train", description="The training set", percentage=0.8 (80% of the dataset) 44 """ 45 46 name: FilenameString = Field( 47 description="The name of the dataset split definition." 48 ) 49 description: str | None = Field( 50 default=None, 51 description="A description of the dataset for you and your team. Not used in training.", 52 ) 53 percentage: float = Field( 54 ge=0.0, 55 le=1.0, 56 description="The percentage of the dataset that this split represents (between 0 and 1).", 57 )
A definition of a split in a dataset.
Example: name="train", description="The training set", percentage=0.8 (80% of the dataset)
64class ExternalToolServer(KilnParentedModel): 65 """ 66 Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local. 67 68 This model stores the necessary configuration to connect to and authenticate with 69 external MCP servers that provide tools for LLM interactions. 70 """ 71 72 name: FilenameString = Field(description="The name of the external tool.") 73 type: ToolServerType = Field( 74 description="The type of external tool server. Remote tools are hosted on a remote server", 75 ) 76 description: str | None = Field( 77 default=None, 78 description="A description of the external tool for you and your team. Will not be used in prompts/training/validation.", 79 ) 80 81 properties: ( 82 LocalServerProperties | RemoteServerProperties | KilnTaskServerProperties 83 ) = Field( 84 description="Configuration properties specific to the tool type.", 85 ) 86 87 # Private variable to store unsaved secrets 88 _unsaved_secrets: dict[str, str] = PrivateAttr(default_factory=dict) 89 90 def model_post_init(self, __context: Any) -> None: 91 # Process secrets after initialization (pydantic v2 hook) 92 self._process_secrets_from_properties() 93 94 def _process_secrets_from_properties(self) -> None: 95 """ 96 Extract secrets from properties and move them to _unsaved_secrets. 97 This removes secrets from the properties dict so they aren't saved to file. 98 Clears existing _unsaved_secrets first to handle property updates correctly. 99 """ 100 # Clear existing unsaved secrets since we're reprocessing 101 self._unsaved_secrets.clear() 102 103 secret_keys = self.get_secret_keys() 104 105 if not secret_keys: 106 return 107 108 # Extract secret values from properties based on server type 109 match self.type: 110 case ToolServerType.remote_mcp: 111 headers = self.properties.get("headers", {}) 112 for key_name in secret_keys: 113 if key_name in headers: 114 self._unsaved_secrets[key_name] = headers[key_name] 115 # Remove from headers immediately so they are not saved to file 116 del headers[key_name] 117 118 case ToolServerType.local_mcp: 119 env_vars = self.properties.get("env_vars", {}) 120 for key_name in secret_keys: 121 if key_name in env_vars: 122 self._unsaved_secrets[key_name] = env_vars[key_name] 123 # Remove from env_vars immediately so they are not saved to file 124 del env_vars[key_name] 125 126 case ToolServerType.kiln_task: 127 pass 128 129 case _: 130 raise_exhaustive_enum_error(self.type) 131 132 def __setattr__(self, name: str, value: Any) -> None: 133 """ 134 Override __setattr__ to process secrets whenever properties are updated. 135 """ 136 super().__setattr__(name, value) 137 138 # Process secrets whenever properties are updated 139 if name == "properties": 140 self._process_secrets_from_properties() 141 142 # Validation Helpers 143 144 @classmethod 145 def check_server_url(cls, server_url: str) -> None: 146 """Validate Server URL""" 147 if not isinstance(server_url, str): 148 raise ValueError("Server URL must be a string") 149 150 # Check for leading whitespace in URL 151 if server_url != server_url.lstrip(): 152 raise ValueError("Server URL must not have leading whitespace") 153 154 parsed_url = urlparse(server_url) 155 if not parsed_url.netloc: 156 raise ValueError("Server URL is not a valid URL") 157 if parsed_url.scheme not in ["http", "https"]: 158 raise ValueError("Server URL must start with http:// or https://") 159 160 @classmethod 161 def check_headers(cls, headers: dict) -> None: 162 """Validate Headers""" 163 if not isinstance(headers, dict): 164 raise ValueError("headers must be a dictionary") 165 166 for key, value in headers.items(): 167 if not key: 168 raise ValueError("Header name is required") 169 if not value: 170 raise ValueError("Header value is required") 171 172 # Reject invalid header names and CR/LF in names/values 173 token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$") 174 if not token_re.match(key): 175 raise ValueError(f'Invalid header name: "{key}"') 176 if re.search(r"\r|\n", key) or re.search(r"\r|\n", value): 177 raise ValueError( 178 "Header names/values must not contain invalid characters" 179 ) 180 181 @classmethod 182 def check_secret_keys( 183 cls, secret_keys: list, key_type: str, tool_type: str 184 ) -> None: 185 """Validate Secret Keys (generic method for both header and env var keys)""" 186 if not isinstance(secret_keys, list): 187 raise ValueError( 188 f"{key_type} must be a list for external tools of type '{tool_type}'" 189 ) 190 if not all(isinstance(k, str) for k in secret_keys): 191 raise ValueError(f"{key_type} must contain only strings") 192 if not all(key for key in secret_keys): 193 raise ValueError("Secret key is required") 194 195 @classmethod 196 def check_env_vars(cls, env_vars: dict) -> None: 197 """Validate Environment Variables""" 198 if not isinstance(env_vars, dict): 199 raise ValueError("environment variables must be a dictionary") 200 201 # Validate env_vars keys are in the correct format for Environment Variables 202 # According to POSIX specification, environment variable names must: 203 # - Start with a letter (a-z, A-Z) or underscore (_) 204 # - Contain only ASCII letters, digits, and underscores 205 for key, _ in env_vars.items(): 206 if not key or not ( 207 key[0].isascii() and (key[0].isalpha() or key[0] == "_") 208 ): 209 raise ValueError( 210 f"Invalid environment variable key: {key}. Must start with a letter or underscore." 211 ) 212 213 if not all(c.isascii() and (c.isalnum() or c == "_") for c in key): 214 raise ValueError( 215 f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores." 216 ) 217 218 @classmethod 219 def type_from_data(cls, data: dict) -> ToolServerType: 220 """Get the tool server type from the data for the the validators""" 221 raw_type = data.get("type") 222 if raw_type is None: 223 raise ValueError("type is required") 224 try: 225 return ToolServerType(raw_type) 226 except ValueError: 227 valid_types = ", ".join(type.value for type in ToolServerType) 228 raise ValueError(f"type must be one of: {valid_types}") 229 230 @model_validator(mode="before") 231 def upgrade_old_properties(cls, data: dict) -> dict: 232 """ 233 Upgrade properties for backwards compatibility. 234 """ 235 properties = data.get("properties") 236 if properties is not None and "is_archived" not in properties: 237 # Add is_archived field with default value back to data 238 properties["is_archived"] = False 239 data["properties"] = properties 240 return data 241 242 @model_validator(mode="before") 243 def validate_required_fields(cls, data: dict) -> dict: 244 """Validate that each tool type has the required configuration.""" 245 server_type = ExternalToolServer.type_from_data(data) 246 properties = data.get("properties", {}) 247 248 match server_type: 249 case ToolServerType.remote_mcp: 250 server_url = properties.get("server_url", None) 251 if server_url is None: 252 raise ValueError( 253 "Server URL is required to connect to a remote MCP server" 254 ) 255 ExternalToolServer.check_server_url(server_url) 256 257 case ToolServerType.local_mcp: 258 command = properties.get("command", None) 259 if command is None: 260 raise ValueError("command is required to start a local MCP server") 261 if not isinstance(command, str): 262 raise ValueError( 263 "command must be a string to start a local MCP server" 264 ) 265 # Reject empty/whitespace-only command strings 266 if command.strip() == "": 267 raise ValueError("command must be a non-empty string") 268 269 args = properties.get("args", None) 270 if args is not None: 271 if not isinstance(args, list): 272 raise ValueError( 273 "arguments must be a list to start a local MCP server" 274 ) 275 276 case ToolServerType.kiln_task: 277 tool_name_validator(properties.get("name", "")) 278 err_msg_prefix = "Kiln task server properties:" 279 validate_return_dict_prop( 280 properties, "description", str, err_msg_prefix 281 ) 282 description = properties.get("description", "") 283 if len(description) > 128: 284 raise ValueError("description must be 128 characters or less") 285 validate_return_dict_prop( 286 properties, "is_archived", bool, err_msg_prefix 287 ) 288 validate_return_dict_prop(properties, "task_id", str, err_msg_prefix) 289 validate_return_dict_prop( 290 properties, "run_config_id", str, err_msg_prefix 291 ) 292 293 case _: 294 # Type checking will catch missing cases 295 raise_exhaustive_enum_error(server_type) 296 return data 297 298 @model_validator(mode="before") 299 def validate_headers_and_env_vars(cls, data: dict) -> dict: 300 """ 301 Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped 302 """ 303 type = ExternalToolServer.type_from_data(data) 304 305 properties = data.get("properties", {}) 306 if properties is None: 307 raise ValueError("properties is required") 308 309 match type: 310 case ToolServerType.remote_mcp: 311 # Validate headers 312 headers = properties.get("headers", None) 313 if headers is not None: 314 ExternalToolServer.check_headers(headers) 315 316 # Secret header keys are optional, validate if they are set 317 secret_header_keys = properties.get("secret_header_keys", None) 318 if secret_header_keys is not None: 319 ExternalToolServer.check_secret_keys( 320 secret_header_keys, "secret_header_keys", "remote_mcp" 321 ) 322 323 case ToolServerType.local_mcp: 324 # Validate secret environment variable keys 325 env_vars = properties.get("env_vars", {}) 326 if env_vars is not None: 327 ExternalToolServer.check_env_vars(env_vars) 328 329 # Secret env var keys are optional, but if they are set, they must be a list of strings 330 secret_env_var_keys = properties.get("secret_env_var_keys", None) 331 if secret_env_var_keys is not None: 332 ExternalToolServer.check_secret_keys( 333 secret_env_var_keys, "secret_env_var_keys", "local_mcp" 334 ) 335 336 case ToolServerType.kiln_task: 337 pass 338 339 case _: 340 raise_exhaustive_enum_error(type) 341 342 return data 343 344 def get_secret_keys(self) -> list[str]: 345 """ 346 Get the list of secret key names based on server type. 347 348 Returns: 349 List of secret key names (header names for remote, env var names for local) 350 """ 351 match self.type: 352 case ToolServerType.remote_mcp: 353 return self.properties.get("secret_header_keys", []) 354 case ToolServerType.local_mcp: 355 return self.properties.get("secret_env_var_keys", []) 356 case ToolServerType.kiln_task: 357 return [] 358 case _: 359 raise_exhaustive_enum_error(self.type) 360 361 def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]: 362 """ 363 Retrieve secrets from configuration system or in-memory storage. 364 Automatically determines which secret keys to retrieve based on the server type. 365 Config secrets take precedence over unsaved secrets. 366 367 Returns: 368 Tuple of (secrets_dict, missing_secrets_list) where: 369 - secrets_dict: Dictionary mapping key names to their secret values 370 - missing_secrets_list: List of secret key names that are missing values 371 """ 372 secrets = {} 373 missing_secrets = [] 374 secret_keys = self.get_secret_keys() 375 376 if secret_keys and len(secret_keys) > 0: 377 config = Config.shared() 378 mcp_secrets = config.get_value(MCP_SECRETS_KEY) 379 380 for key_name in secret_keys: 381 secret_value = None 382 383 # First check config secrets (persistent storage), key is mcp_server_id::key_name 384 secret_key = self._config_secret_key(key_name) 385 secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None 386 387 # Fall back to unsaved secrets (in-memory storage) 388 if ( 389 not secret_value 390 and hasattr(self, "_unsaved_secrets") 391 and key_name in self._unsaved_secrets 392 ): 393 secret_value = self._unsaved_secrets[key_name] 394 395 if secret_value: 396 secrets[key_name] = secret_value 397 else: 398 missing_secrets.append(key_name) 399 400 return secrets, missing_secrets 401 402 def _save_secrets(self) -> None: 403 """ 404 Save unsaved secrets to the configuration system. 405 """ 406 secret_keys = self.get_secret_keys() 407 408 # No secrets to save 409 if not secret_keys: 410 return 411 412 if self.id is None: 413 raise ValueError("Server ID cannot be None when saving secrets") 414 415 # Check if secrets are already saved 416 if not hasattr(self, "_unsaved_secrets") or not self._unsaved_secrets: 417 return 418 419 config = Config.shared() 420 mcp_secrets: dict[str, str] = config.get_value(MCP_SECRETS_KEY) or {} 421 422 # Store secrets with the pattern: mcp_server_id::key_name 423 for key_name, secret_value in self._unsaved_secrets.items(): 424 secret_key = self._config_secret_key(key_name) 425 mcp_secrets[secret_key] = secret_value 426 427 config.update_settings({MCP_SECRETS_KEY: mcp_secrets}) 428 429 # Clear unsaved secrets after saving 430 self._unsaved_secrets.clear() 431 432 def delete_secrets(self) -> None: 433 """ 434 Delete all secrets for this tool server from the configuration system. 435 """ 436 secret_keys = self.get_secret_keys() 437 438 config = Config.shared() 439 mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]() 440 441 # Remove secrets with the pattern: mcp_server_id::key_name 442 for key_name in secret_keys: 443 secret_key = self._config_secret_key(key_name) 444 if secret_key in mcp_secrets: 445 del mcp_secrets[secret_key] 446 447 # Always call update_settings to maintain consistency with the old behavior 448 config.update_settings({MCP_SECRETS_KEY: mcp_secrets}) 449 450 def save_to_file(self) -> None: 451 """ 452 Override save_to_file to automatically save any unsaved secrets before saving to file. 453 454 This ensures that secrets are always saved when the object is saved, 455 preventing the issue where secrets could be lost if save_to_file is called 456 without explicitly saving secrets first. 457 """ 458 # Save any unsaved secrets first 459 if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets: 460 self._save_secrets() 461 462 # Call the parent save_to_file method 463 super().save_to_file() 464 465 # Internal helpers 466 467 def _config_secret_key(self, key_name: str) -> str: 468 """ 469 Generate the secret key pattern for storing/retrieving secrets. 470 471 Args: 472 key_name: The name of the secret key 473 474 Returns: 475 The formatted secret key: "{server_id}::{key_name}" 476 """ 477 return f"{self.id}::{key_name}"
Configuration for communicating with a external MCP (Model Context Protocol) Server for LLM tool calls. External tool servers can be remote or local.
This model stores the necessary configuration to connect to and authenticate with external MCP servers that provide tools for LLM interactions.
90 def model_post_init(self, __context: Any) -> None: 91 # Process secrets after initialization (pydantic v2 hook) 92 self._process_secrets_from_properties()
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
144 @classmethod 145 def check_server_url(cls, server_url: str) -> None: 146 """Validate Server URL""" 147 if not isinstance(server_url, str): 148 raise ValueError("Server URL must be a string") 149 150 # Check for leading whitespace in URL 151 if server_url != server_url.lstrip(): 152 raise ValueError("Server URL must not have leading whitespace") 153 154 parsed_url = urlparse(server_url) 155 if not parsed_url.netloc: 156 raise ValueError("Server URL is not a valid URL") 157 if parsed_url.scheme not in ["http", "https"]: 158 raise ValueError("Server URL must start with http:// or https://")
Validate Server URL
160 @classmethod 161 def check_headers(cls, headers: dict) -> None: 162 """Validate Headers""" 163 if not isinstance(headers, dict): 164 raise ValueError("headers must be a dictionary") 165 166 for key, value in headers.items(): 167 if not key: 168 raise ValueError("Header name is required") 169 if not value: 170 raise ValueError("Header value is required") 171 172 # Reject invalid header names and CR/LF in names/values 173 token_re = re.compile(r"^[!#$%&'*+.^_`|~0-9A-Za-z-]+$") 174 if not token_re.match(key): 175 raise ValueError(f'Invalid header name: "{key}"') 176 if re.search(r"\r|\n", key) or re.search(r"\r|\n", value): 177 raise ValueError( 178 "Header names/values must not contain invalid characters" 179 )
Validate Headers
181 @classmethod 182 def check_secret_keys( 183 cls, secret_keys: list, key_type: str, tool_type: str 184 ) -> None: 185 """Validate Secret Keys (generic method for both header and env var keys)""" 186 if not isinstance(secret_keys, list): 187 raise ValueError( 188 f"{key_type} must be a list for external tools of type '{tool_type}'" 189 ) 190 if not all(isinstance(k, str) for k in secret_keys): 191 raise ValueError(f"{key_type} must contain only strings") 192 if not all(key for key in secret_keys): 193 raise ValueError("Secret key is required")
Validate Secret Keys (generic method for both header and env var keys)
195 @classmethod 196 def check_env_vars(cls, env_vars: dict) -> None: 197 """Validate Environment Variables""" 198 if not isinstance(env_vars, dict): 199 raise ValueError("environment variables must be a dictionary") 200 201 # Validate env_vars keys are in the correct format for Environment Variables 202 # According to POSIX specification, environment variable names must: 203 # - Start with a letter (a-z, A-Z) or underscore (_) 204 # - Contain only ASCII letters, digits, and underscores 205 for key, _ in env_vars.items(): 206 if not key or not ( 207 key[0].isascii() and (key[0].isalpha() or key[0] == "_") 208 ): 209 raise ValueError( 210 f"Invalid environment variable key: {key}. Must start with a letter or underscore." 211 ) 212 213 if not all(c.isascii() and (c.isalnum() or c == "_") for c in key): 214 raise ValueError( 215 f"Invalid environment variable key: {key}. Can only contain letters, digits, and underscores." 216 )
Validate Environment Variables
218 @classmethod 219 def type_from_data(cls, data: dict) -> ToolServerType: 220 """Get the tool server type from the data for the the validators""" 221 raw_type = data.get("type") 222 if raw_type is None: 223 raise ValueError("type is required") 224 try: 225 return ToolServerType(raw_type) 226 except ValueError: 227 valid_types = ", ".join(type.value for type in ToolServerType) 228 raise ValueError(f"type must be one of: {valid_types}")
Get the tool server type from the data for the the validators
230 @model_validator(mode="before") 231 def upgrade_old_properties(cls, data: dict) -> dict: 232 """ 233 Upgrade properties for backwards compatibility. 234 """ 235 properties = data.get("properties") 236 if properties is not None and "is_archived" not in properties: 237 # Add is_archived field with default value back to data 238 properties["is_archived"] = False 239 data["properties"] = properties 240 return data
Upgrade properties for backwards compatibility.
242 @model_validator(mode="before") 243 def validate_required_fields(cls, data: dict) -> dict: 244 """Validate that each tool type has the required configuration.""" 245 server_type = ExternalToolServer.type_from_data(data) 246 properties = data.get("properties", {}) 247 248 match server_type: 249 case ToolServerType.remote_mcp: 250 server_url = properties.get("server_url", None) 251 if server_url is None: 252 raise ValueError( 253 "Server URL is required to connect to a remote MCP server" 254 ) 255 ExternalToolServer.check_server_url(server_url) 256 257 case ToolServerType.local_mcp: 258 command = properties.get("command", None) 259 if command is None: 260 raise ValueError("command is required to start a local MCP server") 261 if not isinstance(command, str): 262 raise ValueError( 263 "command must be a string to start a local MCP server" 264 ) 265 # Reject empty/whitespace-only command strings 266 if command.strip() == "": 267 raise ValueError("command must be a non-empty string") 268 269 args = properties.get("args", None) 270 if args is not None: 271 if not isinstance(args, list): 272 raise ValueError( 273 "arguments must be a list to start a local MCP server" 274 ) 275 276 case ToolServerType.kiln_task: 277 tool_name_validator(properties.get("name", "")) 278 err_msg_prefix = "Kiln task server properties:" 279 validate_return_dict_prop( 280 properties, "description", str, err_msg_prefix 281 ) 282 description = properties.get("description", "") 283 if len(description) > 128: 284 raise ValueError("description must be 128 characters or less") 285 validate_return_dict_prop( 286 properties, "is_archived", bool, err_msg_prefix 287 ) 288 validate_return_dict_prop(properties, "task_id", str, err_msg_prefix) 289 validate_return_dict_prop( 290 properties, "run_config_id", str, err_msg_prefix 291 ) 292 293 case _: 294 # Type checking will catch missing cases 295 raise_exhaustive_enum_error(server_type) 296 return data
Validate that each tool type has the required configuration.
298 @model_validator(mode="before") 299 def validate_headers_and_env_vars(cls, data: dict) -> dict: 300 """ 301 Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped 302 """ 303 type = ExternalToolServer.type_from_data(data) 304 305 properties = data.get("properties", {}) 306 if properties is None: 307 raise ValueError("properties is required") 308 309 match type: 310 case ToolServerType.remote_mcp: 311 # Validate headers 312 headers = properties.get("headers", None) 313 if headers is not None: 314 ExternalToolServer.check_headers(headers) 315 316 # Secret header keys are optional, validate if they are set 317 secret_header_keys = properties.get("secret_header_keys", None) 318 if secret_header_keys is not None: 319 ExternalToolServer.check_secret_keys( 320 secret_header_keys, "secret_header_keys", "remote_mcp" 321 ) 322 323 case ToolServerType.local_mcp: 324 # Validate secret environment variable keys 325 env_vars = properties.get("env_vars", {}) 326 if env_vars is not None: 327 ExternalToolServer.check_env_vars(env_vars) 328 329 # Secret env var keys are optional, but if they are set, they must be a list of strings 330 secret_env_var_keys = properties.get("secret_env_var_keys", None) 331 if secret_env_var_keys is not None: 332 ExternalToolServer.check_secret_keys( 333 secret_env_var_keys, "secret_env_var_keys", "local_mcp" 334 ) 335 336 case ToolServerType.kiln_task: 337 pass 338 339 case _: 340 raise_exhaustive_enum_error(type) 341 342 return data
Validate secrets, these needs to be validated before model initlization because secrets will be processed and stripped
344 def get_secret_keys(self) -> list[str]: 345 """ 346 Get the list of secret key names based on server type. 347 348 Returns: 349 List of secret key names (header names for remote, env var names for local) 350 """ 351 match self.type: 352 case ToolServerType.remote_mcp: 353 return self.properties.get("secret_header_keys", []) 354 case ToolServerType.local_mcp: 355 return self.properties.get("secret_env_var_keys", []) 356 case ToolServerType.kiln_task: 357 return [] 358 case _: 359 raise_exhaustive_enum_error(self.type)
Get the list of secret key names based on server type.
Returns: List of secret key names (header names for remote, env var names for local)
361 def retrieve_secrets(self) -> tuple[dict[str, str], list[str]]: 362 """ 363 Retrieve secrets from configuration system or in-memory storage. 364 Automatically determines which secret keys to retrieve based on the server type. 365 Config secrets take precedence over unsaved secrets. 366 367 Returns: 368 Tuple of (secrets_dict, missing_secrets_list) where: 369 - secrets_dict: Dictionary mapping key names to their secret values 370 - missing_secrets_list: List of secret key names that are missing values 371 """ 372 secrets = {} 373 missing_secrets = [] 374 secret_keys = self.get_secret_keys() 375 376 if secret_keys and len(secret_keys) > 0: 377 config = Config.shared() 378 mcp_secrets = config.get_value(MCP_SECRETS_KEY) 379 380 for key_name in secret_keys: 381 secret_value = None 382 383 # First check config secrets (persistent storage), key is mcp_server_id::key_name 384 secret_key = self._config_secret_key(key_name) 385 secret_value = mcp_secrets.get(secret_key) if mcp_secrets else None 386 387 # Fall back to unsaved secrets (in-memory storage) 388 if ( 389 not secret_value 390 and hasattr(self, "_unsaved_secrets") 391 and key_name in self._unsaved_secrets 392 ): 393 secret_value = self._unsaved_secrets[key_name] 394 395 if secret_value: 396 secrets[key_name] = secret_value 397 else: 398 missing_secrets.append(key_name) 399 400 return secrets, missing_secrets
Retrieve secrets from configuration system or in-memory storage. Automatically determines which secret keys to retrieve based on the server type. Config secrets take precedence over unsaved secrets.
Returns: Tuple of (secrets_dict, missing_secrets_list) where: - secrets_dict: Dictionary mapping key names to their secret values - missing_secrets_list: List of secret key names that are missing values
432 def delete_secrets(self) -> None: 433 """ 434 Delete all secrets for this tool server from the configuration system. 435 """ 436 secret_keys = self.get_secret_keys() 437 438 config = Config.shared() 439 mcp_secrets = config.get_value(MCP_SECRETS_KEY) or dict[str, str]() 440 441 # Remove secrets with the pattern: mcp_server_id::key_name 442 for key_name in secret_keys: 443 secret_key = self._config_secret_key(key_name) 444 if secret_key in mcp_secrets: 445 del mcp_secrets[secret_key] 446 447 # Always call update_settings to maintain consistency with the old behavior 448 config.update_settings({MCP_SECRETS_KEY: mcp_secrets})
Delete all secrets for this tool server from the configuration system.
450 def save_to_file(self) -> None: 451 """ 452 Override save_to_file to automatically save any unsaved secrets before saving to file. 453 454 This ensures that secrets are always saved when the object is saved, 455 preventing the issue where secrets could be lost if save_to_file is called 456 without explicitly saving secrets first. 457 """ 458 # Save any unsaved secrets first 459 if hasattr(self, "_unsaved_secrets") and self._unsaved_secrets: 460 self._save_secrets() 461 462 # Call the parent save_to_file method 463 super().save_to_file()
Override save_to_file to automatically save any unsaved secrets before saving to file.
This ensures that secrets are always saved when the object is saved, preventing the issue where secrets could be lost if save_to_file is called without explicitly saving secrets first.
8class Feedback(KilnParentedModel): 9 """Feedback on a task run. 10 11 Supports multi-source feedback: different users, automated systems, and 12 different locations in the UI can each contribute independent feedback 13 entries on the same task run. 14 """ 15 16 feedback: str = Field( 17 min_length=1, 18 description="Free-form text feedback on the task run.", 19 ) 20 source: FeedbackSource = Field( 21 description="Where this feedback originated, e.g. 'run-page' or 'spec-feedback'.", 22 )
Feedback on a task run.
Supports multi-source feedback: different users, automated systems, and different locations in the UI can each contribute independent feedback entries on the same task run.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
85class FeedbackSource(str, Enum): 86 """Where a piece of feedback originated. 87 88 This is an append-only enum: new sources can be added freely, but existing 89 values must never be removed or renamed so that older persisted data 90 continues to load. 91 """ 92 93 run_page = "run-page" 94 spec_feedback = "spec-feedback"
Where a piece of feedback originated.
This is an append-only enum: new sources can be added freely, but existing values must never be removed or renamed so that older persisted data continues to load.
53class FineTuneStatusType(str, Enum): 54 """ 55 The status type of a fine-tune job. 56 """ 57 58 unknown = "unknown" 59 pending = "pending" 60 running = "running" 61 completed = "completed" 62 failed = "failed"
The status type of a fine-tune job.
24class Finetune(KilnParentedModel): 25 """ 26 The Kiln fine-tune datamodel. 27 28 Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID. 29 """ 30 31 name: FilenameString = Field(description="The name of the fine-tune.") 32 description: str | None = Field( 33 default=None, 34 description="A description of the fine-tune for you and your team. Not used in training.", 35 ) 36 structured_output_mode: StructuredOutputMode | None = Field( 37 default=None, 38 description="Legacy field -- replaced by run_config.structured_output_mode. The mode to use to train the model for structured output, if it was trained with structured output. We should call the tuned model with this mode if set.", 39 ) 40 provider: str = Field( 41 description="The provider to use for the fine-tune (e.g. 'openai')." 42 ) 43 base_model_id: str = Field( 44 description="The id of the base model to use for the fine-tune. This string relates to the provider's IDs for their own models, not Kiln IDs." 45 ) 46 provider_id: str | None = Field( 47 default=None, 48 description="The ID of the fine-tune job on the provider's side. May not be the same as the fine_tune_model_id.", 49 ) 50 fine_tune_model_id: str | None = Field( 51 default=None, 52 description="The ID of the fine-tuned model on the provider's side. May not be the same as the provider_id.", 53 ) 54 dataset_split_id: str = Field( 55 description="The ID of the dataset split to use for this fine-tune.", 56 ) 57 train_split_name: str = Field( 58 default="train", 59 description="The name of the training split to use for this fine-tune.", 60 ) 61 validation_split_name: str | None = Field( 62 default=None, 63 description="The name of the validation split to use for this fine-tune. Optional.", 64 ) 65 parameters: dict[str, str | int | float | bool] = Field( 66 default={}, 67 description="The parameters to use for this fine-tune. These are provider-specific.", 68 ) 69 # These two fields are saved exactly used for training. Even if they map exactly to a custom prompt or generator, those can change, so we want to keep a record of the training prompt. 70 system_message: str = Field( 71 description="The system message to use for this fine-tune.", 72 ) 73 thinking_instructions: str | None = Field( 74 default=None, 75 description="The thinking instructions to use for this fine-tune. Only used when data_strategy is final_and_intermediate.", 76 ) 77 latest_status: FineTuneStatusType = Field( 78 default=FineTuneStatusType.unknown, 79 description="The latest known status of this fine-tune. Not updated in real time.", 80 ) 81 properties: Dict[str, str | int | float] = Field( 82 default={}, 83 description="Properties of the fine-tune. Different providers may use different properties.", 84 ) 85 data_strategy: ChatStrategy = Field( 86 default=ChatStrategy.single_turn, 87 description="The strategy to use for training the model. 'final_only' will only train on the final response. 'final_and_intermediate' will train on the final response and intermediate outputs (chain of thought or reasoning).", 88 ) 89 run_config: KilnAgentRunConfigProperties | None = Field( 90 default=None, 91 description="The run configuration for this fine-tune.", 92 ) 93 94 # Workaround to return typed parent without importing Task 95 def parent_task(self) -> Union["Task", None]: 96 if self.parent is None or self.parent.__class__.__name__ != "Task": 97 return None 98 return self.parent # type: ignore 99 100 def nested_id(self) -> str: 101 """ 102 Build the nested ID for this finetune in the format: project_id::task_id::finetune_id 103 """ 104 task = self.parent_task() 105 if task is None: 106 raise ValueError("Finetune must have a parent task") 107 project = task.parent_project() 108 if project is None: 109 raise ValueError("Finetune must have a parent project") 110 return f"{project.id}::{task.id}::{self.id}" 111 112 @model_validator(mode="after") 113 def validate_thinking_instructions(self) -> Self: 114 if ( 115 self.thinking_instructions is not None 116 and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 117 ): 118 raise ValueError( 119 f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 120 ) 121 if ( 122 self.thinking_instructions is None 123 and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 124 ): 125 raise ValueError( 126 f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 127 ) 128 return self
The Kiln fine-tune datamodel.
Initially holds a reference to a training job, with needed identifiers to update the status. When complete, contains the new model ID.
100 def nested_id(self) -> str: 101 """ 102 Build the nested ID for this finetune in the format: project_id::task_id::finetune_id 103 """ 104 task = self.parent_task() 105 if task is None: 106 raise ValueError("Finetune must have a parent task") 107 project = task.parent_project() 108 if project is None: 109 raise ValueError("Finetune must have a parent project") 110 return f"{project.id}::{task.id}::{self.id}"
Build the nested ID for this finetune in the format: project_id::task_id::finetune_id
112 @model_validator(mode="after") 113 def validate_thinking_instructions(self) -> Self: 114 if ( 115 self.thinking_instructions is not None 116 and self.data_strategy not in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 117 ): 118 raise ValueError( 119 f"Thinking instructions can only be used when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 120 ) 121 if ( 122 self.thinking_instructions is None 123 and self.data_strategy in DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS 124 ): 125 raise ValueError( 126 f"Thinking instructions are required when data_strategy is one of the following: {DATA_STRATIGIES_REQUIRED_THINKING_INSTRUCTIONS}" 127 ) 128 return self
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
9class Priority(IntEnum): 10 """Priority levels, where P0 is highest priority.""" 11 12 p0 = 0 13 p1 = 1 14 p2 = 2 15 p3 = 3
Priority levels, where P0 is highest priority.
16class Project( 17 KilnParentModel, 18 parent_of={ 19 "tasks": Task, 20 "documents": Document, 21 "extractor_configs": ExtractorConfig, 22 "chunker_configs": ChunkerConfig, 23 "embedding_configs": EmbeddingConfig, 24 "rag_configs": RagConfig, 25 "vector_store_configs": VectorStoreConfig, 26 "external_tool_servers": ExternalToolServer, 27 "reranker_configs": RerankerConfig, 28 "skills": Skill, 29 }, 30): 31 """ 32 A collection of related tasks. 33 34 Projects organize tasks into logical groups and provide high-level descriptions 35 of the overall goals. 36 """ 37 38 name: FilenameString = Field(description="The name of the project.") 39 description: str | None = Field( 40 default=None, 41 description="A description of the project for you and your team. Will not be used in prompts/training/validation.", 42 ) 43 44 # Needed for typechecking. We should fix this in KilnParentModel 45 def tasks(self, readonly: bool = False) -> list[Task]: 46 return super().tasks(readonly=readonly) # type: ignore 47 48 def documents(self, readonly: bool = False) -> list[Document]: 49 return super().documents(readonly=readonly) # type: ignore 50 51 def extractor_configs(self, readonly: bool = False) -> list[ExtractorConfig]: 52 return super().extractor_configs(readonly=readonly) # type: ignore 53 54 def chunker_configs(self, readonly: bool = False) -> list[ChunkerConfig]: 55 return super().chunker_configs(readonly=readonly) # type: ignore 56 57 def embedding_configs(self, readonly: bool = False) -> list[EmbeddingConfig]: 58 return super().embedding_configs(readonly=readonly) # type: ignore 59 60 def vector_store_configs(self, readonly: bool = False) -> list[VectorStoreConfig]: 61 return super().vector_store_configs(readonly=readonly) # type: ignore 62 63 def rag_configs(self, readonly: bool = False) -> list[RagConfig]: 64 return super().rag_configs(readonly=readonly) # type: ignore 65 66 def external_tool_servers(self, readonly: bool = False) -> list[ExternalToolServer]: 67 return super().external_tool_servers(readonly=readonly) # type: ignore 68 69 def reranker_configs(self, readonly: bool = False) -> list[RerankerConfig]: 70 return super().reranker_configs(readonly=readonly) # type: ignore 71 72 def skills(self, readonly: bool = False) -> list[Skill]: 73 return super().skills(readonly=readonly) # type: ignore
A collection of related tasks.
Projects organize tasks into logical groups and provide high-level descriptions of the overall goals.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
33class Prompt(KilnParentedModel, BasePrompt): 34 """ 35 A prompt for a task. This is the custom prompt parented by a task. 36 """ 37 38 pass
A prompt for a task. This is the custom prompt parented by a task.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
9class PromptGenerators(str, Enum): 10 """Built-in prompt generators that can construct a prompt from a task definition.""" 11 12 SIMPLE = "simple_prompt_builder" 13 MULTI_SHOT = "multi_shot_prompt_builder" 14 FEW_SHOT = "few_shot_prompt_builder" 15 REPAIRS = "repairs_prompt_builder" 16 SIMPLE_CHAIN_OF_THOUGHT = "simple_chain_of_thought_prompt_builder" 17 FEW_SHOT_CHAIN_OF_THOUGHT = "few_shot_chain_of_thought_prompt_builder" 18 MULTI_SHOT_CHAIN_OF_THOUGHT = "multi_shot_chain_of_thought_prompt_builder"
Built-in prompt generators that can construct a prompt from a task definition.
12class PromptOptimizationJob(KilnParentedModel): 13 """ 14 The Kiln prompt optimization job datamodel. 15 """ 16 17 name: FilenameString = Field(description="The name of the prompt optimization job.") 18 description: str | None = Field( 19 default=None, 20 description="A description of the prompt optimization job for you and your team.", 21 ) 22 job_id: str = Field(description="The ID of the job on the remote Kiln server.") 23 target_run_config_id: str = Field( 24 description="The ID of the run configuration used for this job." 25 ) 26 latest_status: str = Field( 27 default="pending", 28 description="The latest known status of this prompt optimization job (pending, running, succeeded, failed, cancelled). Not updated in real time.", 29 ) 30 optimized_prompt: str | None = Field( 31 default=None, 32 description="The optimized prompt result when the job succeeds.", 33 ) 34 created_prompt_id: str | None = Field( 35 default=None, 36 description="The ID of the prompt created from this job's result, if any.", 37 ) 38 created_run_config_id: str | None = Field( 39 default=None, 40 description="The ID of the run config created from this job's result, if any.", 41 ) 42 eval_ids: list[str] = Field( 43 default_factory=list, 44 description="List of eval IDs used for this job.", 45 ) 46 47 def parent_task(self) -> "Task | None": 48 """Get the parent task, with proper typing.""" 49 if self.parent is None or self.parent.__class__.__name__ != "Task": 50 return None 51 return self.parent # type: ignore
The Kiln prompt optimization job datamodel.
47 def parent_task(self) -> "Task | None": 48 """Get the parent task, with proper typing.""" 49 if self.parent is None or self.parent.__class__.__name__ != "Task": 50 return None 51 return self.parent # type: ignore
Get the parent task, with proper typing.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
20class RequirementRating(BaseModel): 21 """Rating for a specific requirement within a task output.""" 22 23 value: float = Field( 24 description="The rating value. Interpretation depends on rating type" 25 ) 26 type: TaskOutputRatingType = Field(description="The type of rating")
Rating for a specific requirement within a task output.
20class Skill(KilnParentedModel): 21 """A Skill represents reusable agent instructions following the agentskills.io specification. 22 23 Skills are project-level resources that can be attached to run configs. 24 The agent discovers available skills via the skill tool description, then 25 loads a skill's body on demand by calling skill(name="skill_name"). 26 27 The skill's body (markdown instructions) is stored in a SKILL.md sidecar file 28 rather than in skill.kiln, following the agentskills.io spec. 29 """ 30 31 name: SkillNameString = Field( 32 description="Skill name. Kebab-case: lowercase alphanumeric with hyphens.", 33 ) 34 description: str = Field( 35 description="Description of what the skill does and when to use it.", 36 min_length=1, 37 max_length=1024, 38 ) 39 is_archived: bool = Field( 40 default=False, 41 description="Whether the skill is archived. Archived skills are hidden from the UI and not available for use.", 42 ) 43 44 def parent_project(self) -> Union["Project", None]: 45 if self.parent is None or self.parent.__class__.__name__ != "Project": 46 return None 47 return self.parent # type: ignore 48 49 def skill_md_path(self) -> Path: 50 """Path to the SKILL.md sidecar file (sibling of skill.kiln).""" 51 if self.path is None: 52 raise ValueError("Skill must be saved before accessing SKILL.md path") 53 return self.path.parent / SKILL_MD_FILENAME 54 55 def skill_md_raw(self) -> str: 56 """Read the full SKILL.md file content (frontmatter + body).""" 57 md_path = self.skill_md_path() 58 if not md_path.exists(): 59 raise FileNotFoundError(f"SKILL.md not found at {md_path}") 60 if md_path.is_dir(): 61 raise FileNotFoundError(f"SKILL.md path is a folder, not a file: {md_path}") 62 return md_path.read_text(encoding="utf-8") 63 64 def body(self) -> str: 65 """Read the markdown body from SKILL.md (content after YAML frontmatter).""" 66 return _parse_skill_md_body(self.skill_md_raw()) 67 68 # -- Resources (references & assets) -- 69 70 def references_dir(self) -> Path: 71 if self.path is None: 72 raise ValueError( 73 "Skill must be saved before accessing references directory" 74 ) 75 return self.path.parent / "references" 76 77 def assets_dir(self) -> Path: 78 if self.path is None: 79 raise ValueError("Skill must be saved before accessing assets directory") 80 return self.path.parent / "assets" 81 82 def read_reference(self, relative_path: str) -> str: 83 """Read a reference file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.""" 84 return self._read_resource(self.references_dir(), relative_path) 85 86 def read_asset(self, relative_path: str) -> str: 87 """Read an asset file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.""" 88 return self._read_resource(self.assets_dir(), relative_path) 89 90 def _read_resource(self, base_dir: Path, relative_path: str) -> str: 91 """Read a resource file, validating it resolves within base_dir and is readable text.""" 92 if not relative_path or not relative_path.strip(): 93 raise ValueError("Path cannot be empty") 94 95 target = base_dir / relative_path 96 try: 97 resolved = target.resolve() 98 resolved.relative_to(base_dir.resolve()) 99 except ValueError: 100 raise ValueError("Path traversal is not allowed") from None 101 102 if resolved.is_dir(): 103 raise ValueError(f"Path is a folder, not a file: {relative_path}") 104 105 try: 106 return resolved.read_text(encoding="utf-8") 107 except FileNotFoundError: 108 raise FileNotFoundError( 109 f"Resource file not found: {relative_path}" 110 ) from None 111 except UnicodeDecodeError: 112 raise ValueError( 113 f"File is not a readable text file: {relative_path}" 114 ) from None 115 116 def save_skill_md(self, body: str) -> None: 117 """Write SKILL.md with YAML frontmatter (name, description) + markdown body. 118 119 Reads name and description from self to keep SKILL.md in sync with skill.kiln. 120 """ 121 if not body or not body.strip(): 122 raise ValueError("body must be non-empty") 123 frontmatter = yaml.dump( 124 {"name": self.name, "description": self.description}, 125 default_flow_style=False, 126 allow_unicode=True, 127 sort_keys=False, 128 ).rstrip("\n") 129 content = f"---\n{frontmatter}\n---\n\n{body}" 130 self.skill_md_path().write_text(content, encoding="utf-8") 131 self.references_dir().mkdir(exist_ok=True) 132 self.assets_dir().mkdir(exist_ok=True)
A Skill represents reusable agent instructions following the agentskills.io specification.
Skills are project-level resources that can be attached to run configs. The agent discovers available skills via the skill tool description, then loads a skill's body on demand by calling skill(name="skill_name").
The skill's body (markdown instructions) is stored in a SKILL.md sidecar file rather than in skill.kiln, following the agentskills.io spec.
49 def skill_md_path(self) -> Path: 50 """Path to the SKILL.md sidecar file (sibling of skill.kiln).""" 51 if self.path is None: 52 raise ValueError("Skill must be saved before accessing SKILL.md path") 53 return self.path.parent / SKILL_MD_FILENAME
Path to the SKILL.md sidecar file (sibling of skill.kiln).
55 def skill_md_raw(self) -> str: 56 """Read the full SKILL.md file content (frontmatter + body).""" 57 md_path = self.skill_md_path() 58 if not md_path.exists(): 59 raise FileNotFoundError(f"SKILL.md not found at {md_path}") 60 if md_path.is_dir(): 61 raise FileNotFoundError(f"SKILL.md path is a folder, not a file: {md_path}") 62 return md_path.read_text(encoding="utf-8")
Read the full SKILL.md file content (frontmatter + body).
64 def body(self) -> str: 65 """Read the markdown body from SKILL.md (content after YAML frontmatter).""" 66 return _parse_skill_md_body(self.skill_md_raw())
Read the markdown body from SKILL.md (content after YAML frontmatter).
82 def read_reference(self, relative_path: str) -> str: 83 """Read a reference file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.""" 84 return self._read_resource(self.references_dir(), relative_path)
Read a reference file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.
86 def read_asset(self, relative_path: str) -> str: 87 """Read an asset file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.""" 88 return self._read_resource(self.assets_dir(), relative_path)
Read an asset file. Raises ValueError for path traversal, non-text, or if the path is a folder, FileNotFoundError if missing.
116 def save_skill_md(self, body: str) -> None: 117 """Write SKILL.md with YAML frontmatter (name, description) + markdown body. 118 119 Reads name and description from self to keep SKILL.md in sync with skill.kiln. 120 """ 121 if not body or not body.strip(): 122 raise ValueError("body must be non-empty") 123 frontmatter = yaml.dump( 124 {"name": self.name, "description": self.description}, 125 default_flow_style=False, 126 allow_unicode=True, 127 sort_keys=False, 128 ).rstrip("\n") 129 content = f"---\n{frontmatter}\n---\n\n{body}" 130 self.skill_md_path().write_text(content, encoding="utf-8") 131 self.references_dir().mkdir(exist_ok=True) 132 self.assets_dir().mkdir(exist_ok=True)
Write SKILL.md with YAML frontmatter (name, description) + markdown body.
Reads name and description from self to keep SKILL.md in sync with skill.kiln.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
28class StructuredOutputMode(str, Enum): 29 """ 30 Enumeration of supported structured output modes. 31 32 - json_schema: request json using API capabilities for json_schema 33 - function_calling: request json using API capabilities for function calling 34 - json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema 35 - json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings. 36 - json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries). 37 - json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions. 38 - default: let the adapter decide (legacy, do not use for new use cases) 39 - unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime. 40 """ 41 42 default = "default" 43 json_schema = "json_schema" 44 function_calling_weak = "function_calling_weak" 45 function_calling = "function_calling" 46 json_mode = "json_mode" 47 json_instructions = "json_instructions" 48 json_instruction_and_object = "json_instruction_and_object" 49 json_custom_instructions = "json_custom_instructions" 50 unknown = "unknown"
Enumeration of supported structured output modes.
- json_schema: request json using API capabilities for json_schema
- function_calling: request json using API capabilities for function calling
- json_mode: request json using API's JSON mode, which should return valid JSON, but isn't checking/passing the schema
- json_instructions: append instructions to the prompt to request json matching the schema. No API capabilities are used. You should have a custom parser on these models as they will be returning strings.
- json_instruction_and_object: append instructions to the prompt to request json matching the schema. Also request the response as json_mode via API capabilities (returning dictionaries).
- json_custom_instructions: The model should output JSON, but custom instructions are already included in the system prompt. Don't append additional JSON instructions.
- default: let the adapter decide (legacy, do not use for new use cases)
- unknown: used for cases where the structured output mode is not known (on old models where it wasn't saved). Should lookup best option at runtime.
126class Task( 127 KilnParentedModel, 128 KilnParentModel, 129 parent_of={ 130 "runs": TaskRun, 131 "dataset_splits": DatasetSplit, 132 "finetunes": Finetune, 133 "prompt_optimization_jobs": PromptOptimizationJob, 134 "prompts": Prompt, 135 "evals": Eval, 136 "specs": Spec, 137 "run_configs": TaskRunConfig, 138 }, 139): 140 """ 141 Represents a specific task to be performed, with associated requirements and validation rules. 142 143 Contains the task definition, requirements, input/output schemas, and maintains 144 a collection of task runs. 145 """ 146 147 name: FilenameString = Field(description="The name of the task.") 148 description: str | None = Field( 149 default=None, 150 description="A description of the task for you and your team. Will not be used in prompts/training/validation.", 151 ) 152 instruction: str = Field( 153 min_length=1, 154 description="The instructions for the task. Will be used in prompts/training/validation.", 155 ) 156 requirements: List[TaskRequirement] = Field( 157 default=[], 158 description="Deprecated: Use specs and prompts instead.", 159 ) 160 output_json_schema: JsonObjectSchema | None = Field( 161 default=None, 162 description="JSON schema for structured task output. Must be an object schema.", 163 ) 164 input_json_schema: JsonSchema | None = Field( 165 default=None, 166 description="JSON schema for structured task input. Can be an object or array schema.", 167 ) 168 thinking_instruction: str | None = Field( 169 default=None, 170 description="Instructions for the model 'thinking' about the requirement prior to answering. Used for chain of thought style prompting.", 171 ) 172 173 default_run_config_id: ID_TYPE | None = Field( 174 default=None, 175 description="ID of the run config to use for this task by default. Must exist in saved run configs for this task.", 176 ) 177 178 def output_schema(self) -> Dict | None: 179 if self.output_json_schema is None: 180 return None 181 return schema_from_json_str(self.output_json_schema) 182 183 def input_schema(self) -> Dict | None: 184 if self.input_json_schema is None: 185 return None 186 # Allow arrays, not just objects 187 return schema_from_json_str(self.input_json_schema, require_object=False) 188 189 # These wrappers help for typechecking. We should fix this in KilnParentModel 190 def runs(self, readonly: bool = False) -> list[TaskRun]: 191 return super().runs(readonly=readonly) # type: ignore 192 193 def dataset_splits(self, readonly: bool = False) -> list[DatasetSplit]: 194 return super().dataset_splits(readonly=readonly) # type: ignore 195 196 def finetunes(self, readonly: bool = False) -> list[Finetune]: 197 return super().finetunes(readonly=readonly) # type: ignore 198 199 def prompts(self, readonly: bool = False) -> list[Prompt]: 200 return super().prompts(readonly=readonly) # type: ignore 201 202 def evals(self, readonly: bool = False) -> list[Eval]: 203 return super().evals(readonly=readonly) # type: ignore 204 205 def run_configs(self, readonly: bool = False) -> list[TaskRunConfig]: 206 return super().run_configs(readonly=readonly) # type: ignore 207 208 def specs(self, readonly: bool = False) -> list[Spec]: 209 return super().specs(readonly=readonly) # type: ignore 210 211 def prompt_optimization_jobs( 212 self, readonly: bool = False 213 ) -> list[PromptOptimizationJob]: 214 return super().prompt_optimization_jobs(readonly=readonly) # type: ignore 215 216 # Workaround to return typed parent without importing Task 217 def parent_project(self) -> Union["Project", None]: 218 if self.parent is None or self.parent.__class__.__name__ != "Project": 219 return None 220 return self.parent # type: ignore
Represents a specific task to be performed, with associated requirements and validation rules.
Contains the task definition, requirements, input/output schemas, and maintains a collection of task runs.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
325class TaskOutput(KilnBaseModel): 326 """ 327 An output for a specific task run. 328 329 Contains the actual output content, its source (human or synthetic), 330 and optional rating information. 331 """ 332 333 output: str = Field( 334 description="The output of the task. JSON formatted for structured output, plaintext for unstructured output." 335 ) 336 source: DataSource | None = Field( 337 description="The source of the output: human or synthetic.", 338 default=None, 339 ) 340 rating: TaskOutputRating | None = Field( 341 default=None, description="The rating of the output" 342 ) 343 344 def validate_output_format(self, task: "Task") -> Self: 345 # validate output 346 if task.output_json_schema is not None: 347 try: 348 output_parsed = json.loads(self.output) 349 except json.JSONDecodeError: 350 raise ValueError("Output is not a valid JSON object") 351 352 validate_schema_with_value_error( 353 output_parsed, 354 task.output_json_schema, 355 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 356 ) 357 return self 358 359 @model_validator(mode="after") 360 def validate_output_source(self, info: ValidationInfo) -> Self: 361 # On strict mode and not loaded from file, we validate output_source is not None. 362 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 363 if not strict_mode(): 364 return self 365 if self.loaded_from_file(info): 366 return self 367 if self.source is None: 368 raise ValueError("Output source is required when strict mode is enabled") 369 return self
An output for a specific task run.
Contains the actual output content, its source (human or synthetic), and optional rating information.
344 def validate_output_format(self, task: "Task") -> Self: 345 # validate output 346 if task.output_json_schema is not None: 347 try: 348 output_parsed = json.loads(self.output) 349 except json.JSONDecodeError: 350 raise ValueError("Output is not a valid JSON object") 351 352 validate_schema_with_value_error( 353 output_parsed, 354 task.output_json_schema, 355 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 356 ) 357 return self
359 @model_validator(mode="after") 360 def validate_output_source(self, info: ValidationInfo) -> Self: 361 # On strict mode and not loaded from file, we validate output_source is not None. 362 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 363 if not strict_mode(): 364 return self 365 if self.loaded_from_file(info): 366 return self 367 if self.source is None: 368 raise ValueError("Output source is required when strict mode is enabled") 369 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
50class TaskOutputRating(KilnBaseModel): 51 """ 52 A rating for a task output, including an overall rating and ratings for each requirement. 53 54 Supports: 55 - five_star: 1-5 star ratings 56 - pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail) 57 - pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail) 58 """ 59 60 type: TaskOutputRatingType = Field( 61 default=TaskOutputRatingType.five_star, 62 description="The rating system used for this rating.", 63 ) 64 value: float | None = Field( 65 description="The rating value. Interpretation depends on rating type:\n- five_star: 1-5 stars\n- pass_fail: 1.0 (pass) or 0.0 (fail)\n- pass_fail_critical: 1.0 (pass), 0.0 (fail), or -1.0 (critical fail)", 66 default=None, 67 ) 68 requirement_ratings: Dict[ID_TYPE, RequirementRating] = Field( 69 default={}, 70 description="The ratings of the requirements of the task. The ID can be either a task_requirement_id or a named rating for an eval_output_score name (in format 'named::<name>').", 71 ) 72 73 # Previously we stored rating values as a dict of floats, but now we store them as RequirementRating objects. 74 @model_validator(mode="before") 75 def upgrade_old_format(cls, data: dict) -> dict: 76 if not isinstance(data, dict): 77 return data 78 79 # Check if we have the old format (dict of floats) 80 req_ratings = data.get("requirement_ratings", {}) 81 if req_ratings and all( 82 isinstance(v, (int, float)) for v in req_ratings.values() 83 ): 84 # Convert each float to a RequirementRating object 85 # all ratings are five star at the point we used this format 86 data["requirement_ratings"] = { 87 k: {"value": v, "type": TaskOutputRatingType.five_star} 88 for k, v in req_ratings.items() 89 } 90 91 return data 92 93 # Used to select high quality outputs for example selection (MultiShotPromptBuilder, etc) 94 def is_high_quality(self) -> bool: 95 if self.value is None: 96 return False 97 98 if self.type == TaskOutputRatingType.five_star: 99 return self.value >= 4 100 elif self.type == TaskOutputRatingType.pass_fail: 101 return self.value == 1.0 102 elif self.type == TaskOutputRatingType.pass_fail_critical: 103 return self.value == 1.0 104 return False 105 106 @model_validator(mode="after") 107 def validate_rating(self) -> Self: 108 if self.type not in TaskOutputRatingType: 109 raise ValueError(f"Invalid rating type: {self.type}") 110 111 # Overall rating is optional 112 if self.value is not None: 113 self._validate_rating(self.type, self.value, "overall rating") 114 115 for req_id, req_rating in self.requirement_ratings.items(): 116 self._validate_rating( 117 req_rating.type, 118 req_rating.value, 119 f"requirement rating for req ID: {req_id}", 120 ) 121 122 return self 123 124 def _validate_rating( 125 self, type: TaskOutputRatingType, rating: float | None, rating_name: str 126 ) -> None: 127 if type == TaskOutputRatingType.five_star: 128 self._validate_five_star(rating, rating_name) 129 elif type == TaskOutputRatingType.pass_fail: 130 self._validate_pass_fail(rating, rating_name) 131 elif type == TaskOutputRatingType.pass_fail_critical: 132 self._validate_pass_fail_critical(rating, rating_name) 133 134 def _validate_five_star(self, rating: float | None, rating_name: str) -> None: 135 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 136 raise ValueError( 137 f"{rating_name.capitalize()} of type five_star must be an integer value (1-5)" 138 ) 139 if rating < 1 or rating > 5: 140 raise ValueError( 141 f"{rating_name.capitalize()} of type five_star must be between 1 and 5 stars" 142 ) 143 144 def _validate_pass_fail(self, rating: float | None, rating_name: str) -> None: 145 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 146 raise ValueError( 147 f"{rating_name.capitalize()} of type pass_fail must be an integer value (0 or 1)" 148 ) 149 if rating not in [0, 1]: 150 raise ValueError( 151 f"{rating_name.capitalize()} of type pass_fail must be 0 (fail) or 1 (pass)" 152 ) 153 154 def _validate_pass_fail_critical( 155 self, rating: float | None, rating_name: str 156 ) -> None: 157 if rating is None or not isinstance(rating, float) or not rating.is_integer(): 158 raise ValueError( 159 f"{rating_name.capitalize()} of type pass_fail_critical must be an integer value (-1, 0, or 1)" 160 ) 161 if rating not in [-1, 0, 1]: 162 raise ValueError( 163 f"{rating_name.capitalize()} of type pass_fail_critical must be -1 (critical fail), 0 (fail), or 1 (pass)" 164 )
A rating for a task output, including an overall rating and ratings for each requirement.
Supports:
- five_star: 1-5 star ratings
- pass_fail: boolean pass/fail (1.0 = pass, 0.0 = fail)
- pass_fail_critical: tri-state (1.0 = pass, 0.0 = fail, -1.0 = critical fail)
74 @model_validator(mode="before") 75 def upgrade_old_format(cls, data: dict) -> dict: 76 if not isinstance(data, dict): 77 return data 78 79 # Check if we have the old format (dict of floats) 80 req_ratings = data.get("requirement_ratings", {}) 81 if req_ratings and all( 82 isinstance(v, (int, float)) for v in req_ratings.values() 83 ): 84 # Convert each float to a RequirementRating object 85 # all ratings are five star at the point we used this format 86 data["requirement_ratings"] = { 87 k: {"value": v, "type": TaskOutputRatingType.five_star} 88 for k, v in req_ratings.items() 89 } 90 91 return data
94 def is_high_quality(self) -> bool: 95 if self.value is None: 96 return False 97 98 if self.type == TaskOutputRatingType.five_star: 99 return self.value >= 4 100 elif self.type == TaskOutputRatingType.pass_fail: 101 return self.value == 1.0 102 elif self.type == TaskOutputRatingType.pass_fail_critical: 103 return self.value == 1.0 104 return False
106 @model_validator(mode="after") 107 def validate_rating(self) -> Self: 108 if self.type not in TaskOutputRatingType: 109 raise ValueError(f"Invalid rating type: {self.type}") 110 111 # Overall rating is optional 112 if self.value is not None: 113 self._validate_rating(self.type, self.value, "overall rating") 114 115 for req_id, req_rating in self.requirement_ratings.items(): 116 self._validate_rating( 117 req_rating.type, 118 req_rating.value, 119 f"requirement rating for req ID: {req_id}", 120 ) 121 122 return self
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
19class TaskOutputRatingType(str, Enum): 20 """Defines the types of rating systems available for task outputs.""" 21 22 five_star = "five_star" 23 pass_fail = "pass_fail" 24 pass_fail_critical = "pass_fail_critical" 25 custom = "custom"
Defines the types of rating systems available for task outputs.
37class TaskRequirement(BaseModel): 38 """ 39 Defines a specific requirement that should be met by task outputs. 40 41 Includes an identifier, name, description, instruction for meeting the requirement, 42 priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom). 43 """ 44 45 id: ID_TYPE = ID_FIELD 46 name: FilenameStringShort = Field(description="The name of the task requirement.") 47 description: str | None = Field( 48 default=None, 49 description="Optional elaboration on the requirement's purpose.", 50 ) 51 instruction: str = Field( 52 min_length=1, description="Instructions for meeting the requirement." 53 ) 54 priority: Priority = Field( 55 default=Priority.p2, description="The priority level of the requirement." 56 ) 57 type: TaskOutputRatingType = Field( 58 default=TaskOutputRatingType.five_star, 59 description="The rating type used to evaluate this requirement.", 60 )
Defines a specific requirement that should be met by task outputs.
Includes an identifier, name, description, instruction for meeting the requirement, priority level, and rating type (five_star, pass_fail, pass_fail_critical, custom).
89class TaskRun( 90 KilnParentedModel, 91 KilnParentModel, 92 parent_of={ 93 "feedback": Feedback, 94 }, 95): 96 """ 97 Represents a single execution of a Task. 98 99 Contains the input used, its source, the output produced, and optional 100 repair information if the output needed correction. 101 """ 102 103 input: str = Field( 104 description="The inputs to the task. JSON formatted for structured input, plaintext for unstructured input." 105 ) 106 input_source: DataSource | None = Field( 107 default=None, description="The source of the input: human or synthetic." 108 ) 109 110 output: TaskOutput = Field(description="The output of the task run.") 111 repair_instructions: str | None = Field( 112 default=None, 113 description="Instructions for fixing the output. Should define what is wrong, and how to fix it. Will be used by models for both generating a fixed output, and evaluating future models.", 114 ) 115 repaired_output: TaskOutput | None = Field( 116 default=None, 117 description="An version of the output with issues fixed. This must be a 'fixed' version of the existing output, and not an entirely new output. If you wish to generate an ideal curatorial output for this task unrelated to this output, generate a new TaskOutput with type 'human' instead of using this field.", 118 ) 119 intermediate_outputs: Dict[str, str] | None = Field( 120 default=None, 121 description="Intermediate outputs from the task run. Keys are the names of the intermediate output steps (cot=chain of thought, etc), values are the output data.", 122 ) 123 tags: List[str] = Field( 124 default=[], 125 description="Tags for the task run. Tags are used to categorize task runs for filtering and reporting.", 126 ) 127 usage: Usage | None = Field( 128 default=None, 129 description="Usage information for the task run. This includes the number of input tokens, output tokens, and total tokens used.", 130 ) 131 trace: list[ChatCompletionMessageParam] | None = Field( 132 default=None, 133 description="The trace of the task run in OpenAI format. This is the list of messages that were sent to/from the model.", 134 ) 135 parent_task_run_id: str | None = Field( 136 default=None, 137 description="The ID of the parent task run. This is the ID of the task run that contains this task run.", 138 ) 139 140 @property 141 def is_toolcall_pending(self) -> bool: 142 """True if the trace ends with an assistant message awaiting client tool execution.""" 143 return trace_has_pending_client_tool_calls(self.trace) 144 145 def thinking_training_data(self) -> str | None: 146 """ 147 Get the thinking training data from the task run. 148 """ 149 if self.intermediate_outputs is None: 150 return None 151 return self.intermediate_outputs.get( 152 "reasoning" 153 ) or self.intermediate_outputs.get("chain_of_thought") 154 155 def has_thinking_training_data(self) -> bool: 156 """ 157 Does this run have thinking data that we can use to train a thinking model? 158 """ 159 return self.thinking_training_data() is not None 160 161 def feedback(self, readonly: bool = False) -> list[Feedback]: 162 return super().feedback(readonly=readonly) # type: ignore 163 164 # Workaround to return typed parent without importing Task 165 def parent_task(self) -> Union["Task", None]: 166 if self.parent is None or self.parent.__class__.__name__ != "Task": 167 return None 168 return self.parent # type: ignore 169 170 @model_validator(mode="after") 171 def validate_input_format(self, info: ValidationInfo) -> Self: 172 # Don't validate if loading from file (not new). Too slow. 173 # We don't allow changing task schema, so this is redundant validation. 174 # Note: we still validate if editing a loaded model 175 if self.loading_from_file(info): 176 # Consider loading an existing model as validated. 177 self._last_validated_input = self.input 178 return self 179 180 # Don't validate if input has not changed. Too slow to run this every time. 181 if ( 182 hasattr(self, "_last_validated_input") 183 and self.input == self._last_validated_input 184 ): 185 return self 186 187 task = self.parent_task() 188 if task is None: 189 # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving) 190 return self 191 192 # validate input 193 if task.input_json_schema is not None: 194 try: 195 input_parsed = json.loads(self.input) 196 except json.JSONDecodeError: 197 raise ValueError("Input is not a valid JSON object") 198 199 validate_schema_with_value_error( 200 input_parsed, 201 task.input_json_schema, 202 "Input does not match task input schema.", 203 require_object=False, 204 ) 205 206 self._last_validated_input = self.input 207 return self 208 209 @model_validator(mode="after") 210 def validate_output_format(self, info: ValidationInfo) -> Self: 211 # Don't validate if loading from file (not new). Too slow. 212 # Note: we still validate if editing a loaded model's output. 213 if self.loading_from_file(info): 214 # Consider loading an existing model as validated. 215 self._last_validated_output = self.output.output if self.output else None 216 return self 217 218 # Skip output validation when the run is waiting for tool call results. 219 # The output field is empty/partial in this state. 220 if self.is_toolcall_pending: 221 self._last_validated_output = self.output.output if self.output else None 222 return self 223 224 # Don't validate unless output has changed since last validation. 225 # The validator is slow and costly, don't want it running when setting other fields. 226 if ( 227 hasattr(self, "_last_validated_output") 228 and self.output is not None 229 and self.output.output == self._last_validated_output 230 ): 231 return self 232 233 task = self.parent_task() 234 if task is None: 235 return self 236 237 self.output.validate_output_format(task) 238 self._last_validated_output = self.output.output if self.output else None 239 return self 240 241 @model_validator(mode="after") 242 def validate_repaired_output(self) -> Self: 243 if self.repaired_output is not None: 244 if self.repaired_output.rating is not None: 245 raise ValueError( 246 "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed." 247 ) 248 249 task = self.parent_task() 250 if ( 251 task is not None 252 and self.repaired_output.output is not None 253 and task.output_json_schema is not None 254 ): 255 try: 256 output_parsed = json.loads(self.repaired_output.output) 257 except json.JSONDecodeError: 258 raise ValueError("Repaired output is not a valid JSON object") 259 260 validate_schema_with_value_error( 261 output_parsed, 262 task.output_json_schema, 263 "Repaired output does not match task output schema.", 264 ) 265 266 if self.repair_instructions is None and self.repaired_output is not None: 267 raise ValueError( 268 "Repair instructions are required if providing a repaired output." 269 ) 270 if self.repair_instructions is not None and self.repaired_output is None: 271 raise ValueError( 272 "A repaired output is required if providing repair instructions." 273 ) 274 275 return self 276 277 @model_validator(mode="after") 278 def validate_input_source(self, info: ValidationInfo) -> Self: 279 # On strict mode and not loaded from file, we validate input_source is not None. 280 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 281 if not strict_mode(): 282 return self 283 if self.loaded_from_file(info): 284 return self 285 if self.input_source is None: 286 raise ValueError("input_source is required when strict mode is enabled") 287 return self 288 289 @model_validator(mode="after") 290 def validate_tags(self) -> Self: 291 for tag in self.tags: 292 if not tag: 293 raise ValueError("Tags cannot be empty strings") 294 if " " in tag: 295 raise ValueError("Tags cannot contain spaces. Try underscores.") 296 297 return self
Represents a single execution of a Task.
Contains the input used, its source, the output produced, and optional repair information if the output needed correction.
140 @property 141 def is_toolcall_pending(self) -> bool: 142 """True if the trace ends with an assistant message awaiting client tool execution.""" 143 return trace_has_pending_client_tool_calls(self.trace)
True if the trace ends with an assistant message awaiting client tool execution.
145 def thinking_training_data(self) -> str | None: 146 """ 147 Get the thinking training data from the task run. 148 """ 149 if self.intermediate_outputs is None: 150 return None 151 return self.intermediate_outputs.get( 152 "reasoning" 153 ) or self.intermediate_outputs.get("chain_of_thought")
Get the thinking training data from the task run.
155 def has_thinking_training_data(self) -> bool: 156 """ 157 Does this run have thinking data that we can use to train a thinking model? 158 """ 159 return self.thinking_training_data() is not None
Does this run have thinking data that we can use to train a thinking model?
743 def child_method(self, readonly: bool = False) -> list[child_class]: # type: ignore[invalid-type-form] 744 return child_class.all_children_of_parent_path(self.path, readonly=readonly)
The type of the None singleton.
170 @model_validator(mode="after") 171 def validate_input_format(self, info: ValidationInfo) -> Self: 172 # Don't validate if loading from file (not new). Too slow. 173 # We don't allow changing task schema, so this is redundant validation. 174 # Note: we still validate if editing a loaded model 175 if self.loading_from_file(info): 176 # Consider loading an existing model as validated. 177 self._last_validated_input = self.input 178 return self 179 180 # Don't validate if input has not changed. Too slow to run this every time. 181 if ( 182 hasattr(self, "_last_validated_input") 183 and self.input == self._last_validated_input 184 ): 185 return self 186 187 task = self.parent_task() 188 if task is None: 189 # don't validate this relationship until we have a path or parent. Give them time to build it (but will catch it before saving) 190 return self 191 192 # validate input 193 if task.input_json_schema is not None: 194 try: 195 input_parsed = json.loads(self.input) 196 except json.JSONDecodeError: 197 raise ValueError("Input is not a valid JSON object") 198 199 validate_schema_with_value_error( 200 input_parsed, 201 task.input_json_schema, 202 "Input does not match task input schema.", 203 require_object=False, 204 ) 205 206 self._last_validated_input = self.input 207 return self
209 @model_validator(mode="after") 210 def validate_output_format(self, info: ValidationInfo) -> Self: 211 # Don't validate if loading from file (not new). Too slow. 212 # Note: we still validate if editing a loaded model's output. 213 if self.loading_from_file(info): 214 # Consider loading an existing model as validated. 215 self._last_validated_output = self.output.output if self.output else None 216 return self 217 218 # Skip output validation when the run is waiting for tool call results. 219 # The output field is empty/partial in this state. 220 if self.is_toolcall_pending: 221 self._last_validated_output = self.output.output if self.output else None 222 return self 223 224 # Don't validate unless output has changed since last validation. 225 # The validator is slow and costly, don't want it running when setting other fields. 226 if ( 227 hasattr(self, "_last_validated_output") 228 and self.output is not None 229 and self.output.output == self._last_validated_output 230 ): 231 return self 232 233 task = self.parent_task() 234 if task is None: 235 return self 236 237 self.output.validate_output_format(task) 238 self._last_validated_output = self.output.output if self.output else None 239 return self
241 @model_validator(mode="after") 242 def validate_repaired_output(self) -> Self: 243 if self.repaired_output is not None: 244 if self.repaired_output.rating is not None: 245 raise ValueError( 246 "Repaired output rating must be None. Repaired outputs are assumed to have a perfect rating, as they have been fixed." 247 ) 248 249 task = self.parent_task() 250 if ( 251 task is not None 252 and self.repaired_output.output is not None 253 and task.output_json_schema is not None 254 ): 255 try: 256 output_parsed = json.loads(self.repaired_output.output) 257 except json.JSONDecodeError: 258 raise ValueError("Repaired output is not a valid JSON object") 259 260 validate_schema_with_value_error( 261 output_parsed, 262 task.output_json_schema, 263 "Repaired output does not match task output schema.", 264 ) 265 266 if self.repair_instructions is None and self.repaired_output is not None: 267 raise ValueError( 268 "Repair instructions are required if providing a repaired output." 269 ) 270 if self.repair_instructions is not None and self.repaired_output is None: 271 raise ValueError( 272 "A repaired output is required if providing repair instructions." 273 ) 274 275 return self
277 @model_validator(mode="after") 278 def validate_input_source(self, info: ValidationInfo) -> Self: 279 # On strict mode and not loaded from file, we validate input_source is not None. 280 # We want to be able to load any data, even if it's not perfect. But we want to create perfect data when adding new data. 281 if not strict_mode(): 282 return self 283 if self.loaded_from_file(info): 284 return self 285 if self.input_source is None: 286 raise ValueError("input_source is required when strict mode is enabled") 287 return self
The type of the None singleton.
Configuration for the model, should be a dictionary conforming to [ConfigDict][pydantic.config.ConfigDict].
337def init_private_attributes(self: BaseModel, context: Any, /) -> None: 338 """This function is meant to behave like a BaseModel method to initialise private attributes. 339 340 It takes context as an argument since that's what pydantic-core passes when calling it. 341 342 Args: 343 self: The BaseModel instance. 344 context: The context. 345 """ 346 if getattr(self, '__pydantic_private__', None) is None: 347 pydantic_private = {} 348 for name, private_attr in self.__private_attributes__.items(): 349 default = private_attr.get_default() 350 if default is not PydanticUndefined: 351 pydantic_private[name] = default 352 object_setattr(self, '__pydantic_private__', pydantic_private)
This function is meant to behave like a BaseModel method to initialise private attributes.
It takes context as an argument since that's what pydantic-core passes when calling it.
Args: self: The BaseModel instance. context: The context.
22class Usage(BaseModel): 23 """Token usage and cost information for a task run.""" 24 25 input_tokens: int | None = Field( 26 default=None, 27 description="The number of input tokens used in the task run.", 28 ge=0, 29 ) 30 output_tokens: int | None = Field( 31 default=None, 32 description="The number of output tokens used in the task run.", 33 ge=0, 34 ) 35 total_tokens: int | None = Field( 36 default=None, 37 description="The total number of tokens used in the task run.", 38 ge=0, 39 ) 40 cost: float | None = Field( 41 default=None, 42 description="The cost of the task run in US dollars, saved at runtime (prices can change over time).", 43 ge=0, 44 ) 45 cached_tokens: int | None = Field( 46 default=None, 47 description="Number of tokens served from prompt cache. None if not reported.", 48 ge=0, 49 ) 50 51 def __add__(self, other: "Usage") -> "Usage": 52 """Add two Usage objects together, handling None values gracefully. 53 54 None + None = None 55 None + value = value 56 value + None = value 57 value1 + value2 = value1 + value2 58 """ 59 if not isinstance(other, Usage): 60 raise TypeError(f"Cannot add Usage with {type(other).__name__}") 61 62 def _add_optional_int(a: int | None, b: int | None) -> int | None: 63 if a is None and b is None: 64 return None 65 if a is None: 66 return b 67 if b is None: 68 return a 69 return a + b 70 71 def _add_optional_float(a: float | None, b: float | None) -> float | None: 72 if a is None and b is None: 73 return None 74 if a is None: 75 return b 76 if b is None: 77 return a 78 return a + b 79 80 return Usage( 81 input_tokens=_add_optional_int(self.input_tokens, other.input_tokens), 82 output_tokens=_add_optional_int(self.output_tokens, other.output_tokens), 83 total_tokens=_add_optional_int(self.total_tokens, other.total_tokens), 84 cost=_add_optional_float(self.cost, other.cost), 85 cached_tokens=_add_optional_int(self.cached_tokens, other.cached_tokens), 86 )
Token usage and cost information for a task run.