kiln_ai.adapters.model_adapters.base_adapter
1import json 2from abc import ABCMeta, abstractmethod 3from dataclasses import dataclass 4from typing import Dict, Tuple 5 6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter 7from kiln_ai.adapters.ml_model_list import ( 8 KilnModelProvider, 9 StructuredOutputMode, 10 default_structured_output_mode_for_model_provider, 11) 12from kiln_ai.adapters.parsers.json_parser import parse_json_string 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id 15from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from 17from kiln_ai.adapters.run_output import RunOutput 18from kiln_ai.datamodel import ( 19 DataSource, 20 DataSourceType, 21 Task, 22 TaskOutput, 23 TaskRun, 24 Usage, 25) 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error 28from kiln_ai.datamodel.task import RunConfigProperties 29from kiln_ai.tools import KilnToolInterface 30from kiln_ai.tools.tool_registry import tool_from_id 31from kiln_ai.utils.config import Config 32from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam 33 34 35@dataclass 36class AdapterConfig: 37 """ 38 An adapter config is config options that do NOT impact the output of the model. 39 40 For example: if it's saved, of if we request additional data like logprobs. 41 """ 42 43 allow_saving: bool = True 44 top_logprobs: int | None = None 45 default_tags: list[str] | None = None 46 47 """ 48 A custom prompt builder can be injected to override the system prompt building process. 49 If not provided, the prompt builder will be created from the run_config.prompt_id which 50 may load additional files from disk. 51 """ 52 prompt_builder: BasePromptBuilder | None = None 53 54 55class BaseAdapter(metaclass=ABCMeta): 56 """Base class for AI model adapters that handle task execution. 57 58 This abstract class provides the foundation for implementing model-specific adapters 59 that can process tasks with structured or unstructured inputs/outputs. It handles 60 input/output validation, prompt building, and run tracking. 61 62 Prompt building is handled internally by the adapter, which uses a prompt builder 63 based on the run config. To override the prompt building behavior, pass a custom prompt 64 builder to the adapter config. 65 """ 66 67 def __init__( 68 self, 69 task: Task, 70 run_config: RunConfigProperties, 71 config: AdapterConfig | None = None, 72 ): 73 self.task = task 74 self.run_config: RunConfigProperties = run_config 75 self.update_run_config_unknown_structured_output_mode() 76 self.base_adapter_config = config or AdapterConfig() 77 78 self.prompt_builder = ( 79 self.base_adapter_config.prompt_builder 80 or prompt_builder_from_id(run_config.prompt_id, task) 81 ) 82 self._model_provider: KilnModelProvider | None = None 83 84 self.output_schema = task.output_json_schema 85 self.input_schema = task.input_json_schema 86 87 def model_provider(self) -> KilnModelProvider: 88 """ 89 Lazy load the model provider for this adapter. 90 """ 91 if self._model_provider is not None: 92 return self._model_provider 93 if not self.run_config.model_name or not self.run_config.model_provider_name: 94 raise ValueError("model_name and model_provider_name must be provided") 95 self._model_provider = kiln_model_provider_from( 96 self.run_config.model_name, self.run_config.model_provider_name 97 ) 98 if not self._model_provider: 99 raise ValueError( 100 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 101 ) 102 return self._model_provider 103 104 async def invoke( 105 self, 106 input: InputType, 107 input_source: DataSource | None = None, 108 ) -> TaskRun: 109 run_output, _ = await self.invoke_returning_run_output(input, input_source) 110 return run_output 111 112 async def invoke_returning_run_output( 113 self, 114 input: InputType, 115 input_source: DataSource | None = None, 116 ) -> Tuple[TaskRun, RunOutput]: 117 # validate input, allowing arrays 118 if self.input_schema is not None: 119 validate_schema_with_value_error( 120 input, 121 self.input_schema, 122 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 123 require_object=False, 124 ) 125 126 # Format model input for model call (we save the original input in the task without formatting) 127 formatted_input = input 128 formatter_id = self.model_provider().formatter 129 if formatter_id is not None: 130 formatter = request_formatter_from_id(formatter_id) 131 formatted_input = formatter.format_input(input) 132 133 # Run 134 run_output, usage = await self._run(formatted_input) 135 136 # Parse 137 provider = self.model_provider() 138 parser = model_parser_from_id(provider.parser) 139 parsed_output = parser.parse_output(original_output=run_output) 140 141 # validate output 142 if self.output_schema is not None: 143 # Parse json to dict if we have structured output 144 if isinstance(parsed_output.output, str): 145 parsed_output.output = parse_json_string(parsed_output.output) 146 147 if not isinstance(parsed_output.output, dict): 148 raise RuntimeError( 149 f"structured response is not a dict: {parsed_output.output}" 150 ) 151 validate_schema_with_value_error( 152 parsed_output.output, 153 self.output_schema, 154 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 155 ) 156 else: 157 if not isinstance(parsed_output.output, str): 158 raise RuntimeError( 159 f"response is not a string for non-structured task: {parsed_output.output}" 160 ) 161 162 # Validate reasoning content is present and required 163 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 164 trace_has_toolcalls = parsed_output.trace is not None and any( 165 message.get("role", None) == "tool" for message in parsed_output.trace 166 ) 167 if ( 168 provider.reasoning_capable 169 and ( 170 not parsed_output.intermediate_outputs 171 or "reasoning" not in parsed_output.intermediate_outputs 172 ) 173 and not ( 174 provider.reasoning_optional_for_structured_output 175 and self.has_structured_output() 176 ) 177 and not (trace_has_toolcalls) 178 ): 179 raise RuntimeError( 180 "Reasoning is required for this model, but no reasoning was returned." 181 ) 182 183 # Generate the run and output 184 run = self.generate_run( 185 input, input_source, parsed_output, usage, run_output.trace 186 ) 187 188 # Save the run if configured to do so, and we have a path to save to 189 if ( 190 self.base_adapter_config.allow_saving 191 and Config.shared().autosave_runs 192 and self.task.path is not None 193 ): 194 run.save_to_file() 195 else: 196 # Clear the ID to indicate it's not persisted 197 run.id = None 198 199 return run, run_output 200 201 def has_structured_output(self) -> bool: 202 return self.output_schema is not None 203 204 @abstractmethod 205 def adapter_name(self) -> str: 206 pass 207 208 @abstractmethod 209 async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]: 210 pass 211 212 def build_prompt(self) -> str: 213 # The prompt builder needs to know if we want to inject formatting instructions 214 structured_output_mode = self.run_config.structured_output_mode 215 add_json_instructions = self.has_structured_output() and ( 216 structured_output_mode == StructuredOutputMode.json_instructions 217 or structured_output_mode 218 == StructuredOutputMode.json_instruction_and_object 219 ) 220 221 return self.prompt_builder.build_prompt( 222 include_json_instructions=add_json_instructions 223 ) 224 225 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 226 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 227 228 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 229 system_message = self.build_prompt() 230 231 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 232 if not cot_prompt: 233 return get_chat_formatter( 234 strategy=ChatStrategy.single_turn, 235 system_message=system_message, 236 user_input=input, 237 ) 238 239 # Some models like finetunes are trained with a specific chat strategy. Use that. 240 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 241 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 242 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 243 return get_chat_formatter( 244 strategy=tuned_chat_strategy, 245 system_message=system_message, 246 user_input=input, 247 thinking_instructions=cot_prompt, 248 ) 249 250 # Pick the best chat strategy for the model given it has a cot prompt. 251 reasoning_capable = self.model_provider().reasoning_capable 252 if reasoning_capable: 253 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 254 # A simple message with the COT prompt appended to the message list is sufficient 255 return get_chat_formatter( 256 strategy=ChatStrategy.single_turn_r1_thinking, 257 system_message=system_message, 258 user_input=input, 259 thinking_instructions=cot_prompt, 260 ) 261 else: 262 # Unstructured output with COT 263 # Two calls to separate the thinking from the final response 264 return get_chat_formatter( 265 strategy=ChatStrategy.two_message_cot, 266 system_message=system_message, 267 user_input=input, 268 thinking_instructions=cot_prompt, 269 ) 270 271 # create a run and task output 272 def generate_run( 273 self, 274 input: InputType, 275 input_source: DataSource | None, 276 run_output: RunOutput, 277 usage: Usage | None = None, 278 trace: list[ChatCompletionMessageParam] | None = None, 279 ) -> TaskRun: 280 # Convert input and output to JSON strings if they aren't strings 281 input_str = ( 282 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 283 ) 284 output_str = ( 285 json.dumps(run_output.output, ensure_ascii=False) 286 if isinstance(run_output.output, dict) 287 else run_output.output 288 ) 289 290 # If no input source is provided, use the human data source 291 if input_source is None: 292 input_source = DataSource( 293 type=DataSourceType.human, 294 properties={"created_by": Config.shared().user_id}, 295 ) 296 297 new_task_run = TaskRun( 298 parent=self.task, 299 input=input_str, 300 input_source=input_source, 301 output=TaskOutput( 302 output=output_str, 303 # Synthetic since an adapter, not a human, is creating this 304 source=DataSource( 305 type=DataSourceType.synthetic, 306 properties=self._properties_for_task_output(), 307 run_config=self.run_config, 308 ), 309 ), 310 intermediate_outputs=run_output.intermediate_outputs, 311 tags=self.base_adapter_config.default_tags or [], 312 usage=usage, 313 trace=trace, 314 ) 315 316 return new_task_run 317 318 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 319 props = {} 320 321 props["adapter_name"] = self.adapter_name() 322 323 # Legacy properties where we save the run_config details into custom properties. 324 # These are now also be saved in the run_config field. 325 props["model_name"] = self.run_config.model_name 326 props["model_provider"] = self.run_config.model_provider_name 327 props["prompt_id"] = self.run_config.prompt_id 328 props["structured_output_mode"] = self.run_config.structured_output_mode 329 props["temperature"] = self.run_config.temperature 330 props["top_p"] = self.run_config.top_p 331 332 return props 333 334 def update_run_config_unknown_structured_output_mode(self) -> None: 335 structured_output_mode = self.run_config.structured_output_mode 336 337 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 338 # Look up our recommended mode from ml_model_list if we have one 339 if structured_output_mode == StructuredOutputMode.unknown: 340 new_run_config = self.run_config.model_copy(deep=True) 341 structured_output_mode = default_structured_output_mode_for_model_provider( 342 self.run_config.model_name, 343 self.run_config.model_provider_name, 344 ) 345 new_run_config.structured_output_mode = structured_output_mode 346 self.run_config = new_run_config 347 348 async def available_tools(self) -> list[KilnToolInterface]: 349 tool_config = self.run_config.tools_config 350 if tool_config is None or tool_config.tools is None: 351 return [] 352 353 project = self.task.parent_project() 354 if project is None: 355 raise ValueError("Task must have a parent project to resolve tools") 356 357 project_id = project.id 358 if project_id is None: 359 raise ValueError("Project must have an ID to resolve tools") 360 361 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 362 363 # Check each tool has a unique name 364 tool_names = [await tool.name() for tool in tools] 365 if len(tool_names) != len(set(tool_names)): 366 raise ValueError( 367 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 368 ) 369 370 return tools
@dataclass
class
AdapterConfig:
36@dataclass 37class AdapterConfig: 38 """ 39 An adapter config is config options that do NOT impact the output of the model. 40 41 For example: if it's saved, of if we request additional data like logprobs. 42 """ 43 44 allow_saving: bool = True 45 top_logprobs: int | None = None 46 default_tags: list[str] | None = None 47 48 """ 49 A custom prompt builder can be injected to override the system prompt building process. 50 If not provided, the prompt builder will be created from the run_config.prompt_id which 51 may load additional files from disk. 52 """ 53 prompt_builder: BasePromptBuilder | None = None
An adapter config is config options that do NOT impact the output of the model.
For example: if it's saved, of if we request additional data like logprobs.
AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None, prompt_builder: kiln_ai.adapters.prompt_builders.BasePromptBuilder | None = None)
class
BaseAdapter:
56class BaseAdapter(metaclass=ABCMeta): 57 """Base class for AI model adapters that handle task execution. 58 59 This abstract class provides the foundation for implementing model-specific adapters 60 that can process tasks with structured or unstructured inputs/outputs. It handles 61 input/output validation, prompt building, and run tracking. 62 63 Prompt building is handled internally by the adapter, which uses a prompt builder 64 based on the run config. To override the prompt building behavior, pass a custom prompt 65 builder to the adapter config. 66 """ 67 68 def __init__( 69 self, 70 task: Task, 71 run_config: RunConfigProperties, 72 config: AdapterConfig | None = None, 73 ): 74 self.task = task 75 self.run_config: RunConfigProperties = run_config 76 self.update_run_config_unknown_structured_output_mode() 77 self.base_adapter_config = config or AdapterConfig() 78 79 self.prompt_builder = ( 80 self.base_adapter_config.prompt_builder 81 or prompt_builder_from_id(run_config.prompt_id, task) 82 ) 83 self._model_provider: KilnModelProvider | None = None 84 85 self.output_schema = task.output_json_schema 86 self.input_schema = task.input_json_schema 87 88 def model_provider(self) -> KilnModelProvider: 89 """ 90 Lazy load the model provider for this adapter. 91 """ 92 if self._model_provider is not None: 93 return self._model_provider 94 if not self.run_config.model_name or not self.run_config.model_provider_name: 95 raise ValueError("model_name and model_provider_name must be provided") 96 self._model_provider = kiln_model_provider_from( 97 self.run_config.model_name, self.run_config.model_provider_name 98 ) 99 if not self._model_provider: 100 raise ValueError( 101 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 102 ) 103 return self._model_provider 104 105 async def invoke( 106 self, 107 input: InputType, 108 input_source: DataSource | None = None, 109 ) -> TaskRun: 110 run_output, _ = await self.invoke_returning_run_output(input, input_source) 111 return run_output 112 113 async def invoke_returning_run_output( 114 self, 115 input: InputType, 116 input_source: DataSource | None = None, 117 ) -> Tuple[TaskRun, RunOutput]: 118 # validate input, allowing arrays 119 if self.input_schema is not None: 120 validate_schema_with_value_error( 121 input, 122 self.input_schema, 123 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 124 require_object=False, 125 ) 126 127 # Format model input for model call (we save the original input in the task without formatting) 128 formatted_input = input 129 formatter_id = self.model_provider().formatter 130 if formatter_id is not None: 131 formatter = request_formatter_from_id(formatter_id) 132 formatted_input = formatter.format_input(input) 133 134 # Run 135 run_output, usage = await self._run(formatted_input) 136 137 # Parse 138 provider = self.model_provider() 139 parser = model_parser_from_id(provider.parser) 140 parsed_output = parser.parse_output(original_output=run_output) 141 142 # validate output 143 if self.output_schema is not None: 144 # Parse json to dict if we have structured output 145 if isinstance(parsed_output.output, str): 146 parsed_output.output = parse_json_string(parsed_output.output) 147 148 if not isinstance(parsed_output.output, dict): 149 raise RuntimeError( 150 f"structured response is not a dict: {parsed_output.output}" 151 ) 152 validate_schema_with_value_error( 153 parsed_output.output, 154 self.output_schema, 155 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 156 ) 157 else: 158 if not isinstance(parsed_output.output, str): 159 raise RuntimeError( 160 f"response is not a string for non-structured task: {parsed_output.output}" 161 ) 162 163 # Validate reasoning content is present and required 164 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 165 trace_has_toolcalls = parsed_output.trace is not None and any( 166 message.get("role", None) == "tool" for message in parsed_output.trace 167 ) 168 if ( 169 provider.reasoning_capable 170 and ( 171 not parsed_output.intermediate_outputs 172 or "reasoning" not in parsed_output.intermediate_outputs 173 ) 174 and not ( 175 provider.reasoning_optional_for_structured_output 176 and self.has_structured_output() 177 ) 178 and not (trace_has_toolcalls) 179 ): 180 raise RuntimeError( 181 "Reasoning is required for this model, but no reasoning was returned." 182 ) 183 184 # Generate the run and output 185 run = self.generate_run( 186 input, input_source, parsed_output, usage, run_output.trace 187 ) 188 189 # Save the run if configured to do so, and we have a path to save to 190 if ( 191 self.base_adapter_config.allow_saving 192 and Config.shared().autosave_runs 193 and self.task.path is not None 194 ): 195 run.save_to_file() 196 else: 197 # Clear the ID to indicate it's not persisted 198 run.id = None 199 200 return run, run_output 201 202 def has_structured_output(self) -> bool: 203 return self.output_schema is not None 204 205 @abstractmethod 206 def adapter_name(self) -> str: 207 pass 208 209 @abstractmethod 210 async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]: 211 pass 212 213 def build_prompt(self) -> str: 214 # The prompt builder needs to know if we want to inject formatting instructions 215 structured_output_mode = self.run_config.structured_output_mode 216 add_json_instructions = self.has_structured_output() and ( 217 structured_output_mode == StructuredOutputMode.json_instructions 218 or structured_output_mode 219 == StructuredOutputMode.json_instruction_and_object 220 ) 221 222 return self.prompt_builder.build_prompt( 223 include_json_instructions=add_json_instructions 224 ) 225 226 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 227 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 228 229 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 230 system_message = self.build_prompt() 231 232 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 233 if not cot_prompt: 234 return get_chat_formatter( 235 strategy=ChatStrategy.single_turn, 236 system_message=system_message, 237 user_input=input, 238 ) 239 240 # Some models like finetunes are trained with a specific chat strategy. Use that. 241 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 242 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 243 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 244 return get_chat_formatter( 245 strategy=tuned_chat_strategy, 246 system_message=system_message, 247 user_input=input, 248 thinking_instructions=cot_prompt, 249 ) 250 251 # Pick the best chat strategy for the model given it has a cot prompt. 252 reasoning_capable = self.model_provider().reasoning_capable 253 if reasoning_capable: 254 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 255 # A simple message with the COT prompt appended to the message list is sufficient 256 return get_chat_formatter( 257 strategy=ChatStrategy.single_turn_r1_thinking, 258 system_message=system_message, 259 user_input=input, 260 thinking_instructions=cot_prompt, 261 ) 262 else: 263 # Unstructured output with COT 264 # Two calls to separate the thinking from the final response 265 return get_chat_formatter( 266 strategy=ChatStrategy.two_message_cot, 267 system_message=system_message, 268 user_input=input, 269 thinking_instructions=cot_prompt, 270 ) 271 272 # create a run and task output 273 def generate_run( 274 self, 275 input: InputType, 276 input_source: DataSource | None, 277 run_output: RunOutput, 278 usage: Usage | None = None, 279 trace: list[ChatCompletionMessageParam] | None = None, 280 ) -> TaskRun: 281 # Convert input and output to JSON strings if they aren't strings 282 input_str = ( 283 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 284 ) 285 output_str = ( 286 json.dumps(run_output.output, ensure_ascii=False) 287 if isinstance(run_output.output, dict) 288 else run_output.output 289 ) 290 291 # If no input source is provided, use the human data source 292 if input_source is None: 293 input_source = DataSource( 294 type=DataSourceType.human, 295 properties={"created_by": Config.shared().user_id}, 296 ) 297 298 new_task_run = TaskRun( 299 parent=self.task, 300 input=input_str, 301 input_source=input_source, 302 output=TaskOutput( 303 output=output_str, 304 # Synthetic since an adapter, not a human, is creating this 305 source=DataSource( 306 type=DataSourceType.synthetic, 307 properties=self._properties_for_task_output(), 308 run_config=self.run_config, 309 ), 310 ), 311 intermediate_outputs=run_output.intermediate_outputs, 312 tags=self.base_adapter_config.default_tags or [], 313 usage=usage, 314 trace=trace, 315 ) 316 317 return new_task_run 318 319 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 320 props = {} 321 322 props["adapter_name"] = self.adapter_name() 323 324 # Legacy properties where we save the run_config details into custom properties. 325 # These are now also be saved in the run_config field. 326 props["model_name"] = self.run_config.model_name 327 props["model_provider"] = self.run_config.model_provider_name 328 props["prompt_id"] = self.run_config.prompt_id 329 props["structured_output_mode"] = self.run_config.structured_output_mode 330 props["temperature"] = self.run_config.temperature 331 props["top_p"] = self.run_config.top_p 332 333 return props 334 335 def update_run_config_unknown_structured_output_mode(self) -> None: 336 structured_output_mode = self.run_config.structured_output_mode 337 338 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 339 # Look up our recommended mode from ml_model_list if we have one 340 if structured_output_mode == StructuredOutputMode.unknown: 341 new_run_config = self.run_config.model_copy(deep=True) 342 structured_output_mode = default_structured_output_mode_for_model_provider( 343 self.run_config.model_name, 344 self.run_config.model_provider_name, 345 ) 346 new_run_config.structured_output_mode = structured_output_mode 347 self.run_config = new_run_config 348 349 async def available_tools(self) -> list[KilnToolInterface]: 350 tool_config = self.run_config.tools_config 351 if tool_config is None or tool_config.tools is None: 352 return [] 353 354 project = self.task.parent_project() 355 if project is None: 356 raise ValueError("Task must have a parent project to resolve tools") 357 358 project_id = project.id 359 if project_id is None: 360 raise ValueError("Project must have an ID to resolve tools") 361 362 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 363 364 # Check each tool has a unique name 365 tool_names = [await tool.name() for tool in tools] 366 if len(tool_names) != len(set(tool_names)): 367 raise ValueError( 368 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 369 ) 370 371 return tools
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.
88 def model_provider(self) -> KilnModelProvider: 89 """ 90 Lazy load the model provider for this adapter. 91 """ 92 if self._model_provider is not None: 93 return self._model_provider 94 if not self.run_config.model_name or not self.run_config.model_provider_name: 95 raise ValueError("model_name and model_provider_name must be provided") 96 self._model_provider = kiln_model_provider_from( 97 self.run_config.model_name, self.run_config.model_provider_name 98 ) 99 if not self._model_provider: 100 raise ValueError( 101 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 102 ) 103 return self._model_provider
Lazy load the model provider for this adapter.
async def
invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
async def
invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
113 async def invoke_returning_run_output( 114 self, 115 input: InputType, 116 input_source: DataSource | None = None, 117 ) -> Tuple[TaskRun, RunOutput]: 118 # validate input, allowing arrays 119 if self.input_schema is not None: 120 validate_schema_with_value_error( 121 input, 122 self.input_schema, 123 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 124 require_object=False, 125 ) 126 127 # Format model input for model call (we save the original input in the task without formatting) 128 formatted_input = input 129 formatter_id = self.model_provider().formatter 130 if formatter_id is not None: 131 formatter = request_formatter_from_id(formatter_id) 132 formatted_input = formatter.format_input(input) 133 134 # Run 135 run_output, usage = await self._run(formatted_input) 136 137 # Parse 138 provider = self.model_provider() 139 parser = model_parser_from_id(provider.parser) 140 parsed_output = parser.parse_output(original_output=run_output) 141 142 # validate output 143 if self.output_schema is not None: 144 # Parse json to dict if we have structured output 145 if isinstance(parsed_output.output, str): 146 parsed_output.output = parse_json_string(parsed_output.output) 147 148 if not isinstance(parsed_output.output, dict): 149 raise RuntimeError( 150 f"structured response is not a dict: {parsed_output.output}" 151 ) 152 validate_schema_with_value_error( 153 parsed_output.output, 154 self.output_schema, 155 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 156 ) 157 else: 158 if not isinstance(parsed_output.output, str): 159 raise RuntimeError( 160 f"response is not a string for non-structured task: {parsed_output.output}" 161 ) 162 163 # Validate reasoning content is present and required 164 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 165 trace_has_toolcalls = parsed_output.trace is not None and any( 166 message.get("role", None) == "tool" for message in parsed_output.trace 167 ) 168 if ( 169 provider.reasoning_capable 170 and ( 171 not parsed_output.intermediate_outputs 172 or "reasoning" not in parsed_output.intermediate_outputs 173 ) 174 and not ( 175 provider.reasoning_optional_for_structured_output 176 and self.has_structured_output() 177 ) 178 and not (trace_has_toolcalls) 179 ): 180 raise RuntimeError( 181 "Reasoning is required for this model, but no reasoning was returned." 182 ) 183 184 # Generate the run and output 185 run = self.generate_run( 186 input, input_source, parsed_output, usage, run_output.trace 187 ) 188 189 # Save the run if configured to do so, and we have a path to save to 190 if ( 191 self.base_adapter_config.allow_saving 192 and Config.shared().autosave_runs 193 and self.task.path is not None 194 ): 195 run.save_to_file() 196 else: 197 # Clear the ID to indicate it's not persisted 198 run.id = None 199 200 return run, run_output
def
build_prompt(self) -> str:
213 def build_prompt(self) -> str: 214 # The prompt builder needs to know if we want to inject formatting instructions 215 structured_output_mode = self.run_config.structured_output_mode 216 add_json_instructions = self.has_structured_output() and ( 217 structured_output_mode == StructuredOutputMode.json_instructions 218 or structured_output_mode 219 == StructuredOutputMode.json_instruction_and_object 220 ) 221 222 return self.prompt_builder.build_prompt( 223 include_json_instructions=add_json_instructions 224 )
def
build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str]) -> kiln_ai.adapters.chat.ChatFormatter:
226 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 227 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 228 229 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 230 system_message = self.build_prompt() 231 232 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 233 if not cot_prompt: 234 return get_chat_formatter( 235 strategy=ChatStrategy.single_turn, 236 system_message=system_message, 237 user_input=input, 238 ) 239 240 # Some models like finetunes are trained with a specific chat strategy. Use that. 241 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 242 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 243 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 244 return get_chat_formatter( 245 strategy=tuned_chat_strategy, 246 system_message=system_message, 247 user_input=input, 248 thinking_instructions=cot_prompt, 249 ) 250 251 # Pick the best chat strategy for the model given it has a cot prompt. 252 reasoning_capable = self.model_provider().reasoning_capable 253 if reasoning_capable: 254 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 255 # A simple message with the COT prompt appended to the message list is sufficient 256 return get_chat_formatter( 257 strategy=ChatStrategy.single_turn_r1_thinking, 258 system_message=system_message, 259 user_input=input, 260 thinking_instructions=cot_prompt, 261 ) 262 else: 263 # Unstructured output with COT 264 # Two calls to separate the thinking from the final response 265 return get_chat_formatter( 266 strategy=ChatStrategy.two_message_cot, 267 system_message=system_message, 268 user_input=input, 269 thinking_instructions=cot_prompt, 270 )
def
generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
273 def generate_run( 274 self, 275 input: InputType, 276 input_source: DataSource | None, 277 run_output: RunOutput, 278 usage: Usage | None = None, 279 trace: list[ChatCompletionMessageParam] | None = None, 280 ) -> TaskRun: 281 # Convert input and output to JSON strings if they aren't strings 282 input_str = ( 283 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 284 ) 285 output_str = ( 286 json.dumps(run_output.output, ensure_ascii=False) 287 if isinstance(run_output.output, dict) 288 else run_output.output 289 ) 290 291 # If no input source is provided, use the human data source 292 if input_source is None: 293 input_source = DataSource( 294 type=DataSourceType.human, 295 properties={"created_by": Config.shared().user_id}, 296 ) 297 298 new_task_run = TaskRun( 299 parent=self.task, 300 input=input_str, 301 input_source=input_source, 302 output=TaskOutput( 303 output=output_str, 304 # Synthetic since an adapter, not a human, is creating this 305 source=DataSource( 306 type=DataSourceType.synthetic, 307 properties=self._properties_for_task_output(), 308 run_config=self.run_config, 309 ), 310 ), 311 intermediate_outputs=run_output.intermediate_outputs, 312 tags=self.base_adapter_config.default_tags or [], 313 usage=usage, 314 trace=trace, 315 ) 316 317 return new_task_run
def
update_run_config_unknown_structured_output_mode(self) -> None:
335 def update_run_config_unknown_structured_output_mode(self) -> None: 336 structured_output_mode = self.run_config.structured_output_mode 337 338 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 339 # Look up our recommended mode from ml_model_list if we have one 340 if structured_output_mode == StructuredOutputMode.unknown: 341 new_run_config = self.run_config.model_copy(deep=True) 342 structured_output_mode = default_structured_output_mode_for_model_provider( 343 self.run_config.model_name, 344 self.run_config.model_provider_name, 345 ) 346 new_run_config.structured_output_mode = structured_output_mode 347 self.run_config = new_run_config
349 async def available_tools(self) -> list[KilnToolInterface]: 350 tool_config = self.run_config.tools_config 351 if tool_config is None or tool_config.tools is None: 352 return [] 353 354 project = self.task.parent_project() 355 if project is None: 356 raise ValueError("Task must have a parent project to resolve tools") 357 358 project_id = project.id 359 if project_id is None: 360 raise ValueError("Project must have an ID to resolve tools") 361 362 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 363 364 # Check each tool has a unique name 365 tool_names = [await tool.name() for tool in tools] 366 if len(tool_names) != len(set(tool_names)): 367 raise ValueError( 368 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 369 ) 370 371 return tools