kiln_ai.adapters.model_adapters.base_adapter
1import json 2from abc import ABCMeta, abstractmethod 3from dataclasses import dataclass 4from typing import Dict, Tuple 5 6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter 7from kiln_ai.adapters.ml_model_list import ( 8 KilnModelProvider, 9 StructuredOutputMode, 10 default_structured_output_mode_for_model_provider, 11) 12from kiln_ai.adapters.parsers.json_parser import parse_json_string 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id 15from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from 17from kiln_ai.adapters.run_output import RunOutput 18from kiln_ai.datamodel import ( 19 DataSource, 20 DataSourceType, 21 Task, 22 TaskOutput, 23 TaskRun, 24 Usage, 25) 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error 28from kiln_ai.datamodel.run_config import ( 29 KilnAgentRunConfigProperties, 30 as_kiln_agent_run_config, 31) 32from kiln_ai.datamodel.task import RunConfigProperties 33 34# Import agent run context for run lifecycle management 35from kiln_ai.run_context import ( 36 clear_agent_run_id, 37 generate_agent_run_id, 38 get_agent_run_id, 39 set_agent_run_id, 40) 41from kiln_ai.tools import KilnToolInterface 42from kiln_ai.tools.mcp_session_manager import MCPSessionManager 43from kiln_ai.tools.tool_registry import tool_from_id 44from kiln_ai.utils.config import Config 45from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error 46from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam 47 48 49@dataclass 50class AdapterConfig: 51 """ 52 An adapter config is config options that do NOT impact the output of the model. 53 54 For example: if it's saved, of if we request additional data like logprobs. 55 """ 56 57 allow_saving: bool = True 58 top_logprobs: int | None = None 59 default_tags: list[str] | None = None 60 61 """ 62 A custom prompt builder can be injected to override the system prompt building process. 63 If not provided, the prompt builder will be created from the run_config.prompt_id which 64 may load additional files from disk. 65 """ 66 prompt_builder: BasePromptBuilder | None = None 67 68 69class BaseAdapter(metaclass=ABCMeta): 70 """Base class for AI model adapters that handle task execution. 71 72 This abstract class provides the foundation for implementing model-specific adapters 73 that can process tasks with structured or unstructured inputs/outputs. It handles 74 input/output validation, prompt building, and run tracking. 75 76 Prompt building is handled internally by the adapter, which uses a prompt builder 77 based on the run config. To override the prompt building behavior, pass a custom prompt 78 builder to the adapter config. 79 """ 80 81 def __init__( 82 self, 83 task: Task, 84 run_config: RunConfigProperties, 85 config: AdapterConfig | None = None, 86 ): 87 self.task = task 88 self.run_config: RunConfigProperties = run_config 89 self.base_adapter_config = config or AdapterConfig() 90 91 if isinstance(run_config, KilnAgentRunConfigProperties): 92 self.update_run_config_unknown_structured_output_mode() 93 self.prompt_builder = ( 94 self.base_adapter_config.prompt_builder 95 or prompt_builder_from_id(run_config.prompt_id, task) 96 ) 97 else: 98 self.prompt_builder = None 99 self._model_provider: KilnModelProvider | None = None 100 101 self.output_schema = task.output_json_schema 102 self.input_schema = task.input_json_schema 103 104 def model_provider(self) -> KilnModelProvider: 105 """ 106 Lazy load the model provider for this adapter. 107 """ 108 if self._model_provider is not None: 109 return self._model_provider 110 run_config = as_kiln_agent_run_config(self.run_config) 111 if not run_config.model_name or not run_config.model_provider_name: 112 raise ValueError("model_name and model_provider_name must be provided") 113 self._model_provider = kiln_model_provider_from( 114 run_config.model_name, run_config.model_provider_name 115 ) 116 if not self._model_provider: 117 raise ValueError( 118 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 119 ) 120 return self._model_provider 121 122 async def invoke( 123 self, 124 input: InputType, 125 input_source: DataSource | None = None, 126 ) -> TaskRun: 127 run_output, _ = await self.invoke_returning_run_output(input, input_source) 128 return run_output 129 130 async def _run_returning_run_output( 131 self, 132 input: InputType, 133 input_source: DataSource | None = None, 134 ) -> Tuple[TaskRun, RunOutput]: 135 # validate input, allowing arrays 136 if self.input_schema is not None: 137 validate_schema_with_value_error( 138 input, 139 self.input_schema, 140 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 141 require_object=False, 142 ) 143 144 # Format model input for model call (we save the original input in the task without formatting) 145 formatted_input = input 146 formatter_id = self.model_provider().formatter 147 if formatter_id is not None: 148 formatter = request_formatter_from_id(formatter_id) 149 formatted_input = formatter.format_input(input) 150 151 # Run 152 run_output, usage = await self._run(formatted_input) 153 154 # Parse 155 provider = self.model_provider() 156 parser = model_parser_from_id(provider.parser) 157 parsed_output = parser.parse_output(original_output=run_output) 158 159 # validate output 160 if self.output_schema is not None: 161 # Parse json to dict if we have structured output 162 if isinstance(parsed_output.output, str): 163 parsed_output.output = parse_json_string(parsed_output.output) 164 165 if not isinstance(parsed_output.output, dict): 166 raise RuntimeError( 167 f"structured response is not a dict: {parsed_output.output}" 168 ) 169 validate_schema_with_value_error( 170 parsed_output.output, 171 self.output_schema, 172 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 173 ) 174 else: 175 if not isinstance(parsed_output.output, str): 176 raise RuntimeError( 177 f"response is not a string for non-structured task: {parsed_output.output}" 178 ) 179 180 # Validate reasoning content is present and required 181 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 182 trace_has_toolcalls = parsed_output.trace is not None and any( 183 message.get("role", None) == "tool" for message in parsed_output.trace 184 ) 185 if ( 186 provider.reasoning_capable 187 and ( 188 not parsed_output.intermediate_outputs 189 or "reasoning" not in parsed_output.intermediate_outputs 190 ) 191 and not ( 192 provider.reasoning_optional_for_structured_output 193 and self.has_structured_output() 194 ) 195 and not (trace_has_toolcalls) 196 ): 197 raise RuntimeError( 198 "Reasoning is required for this model, but no reasoning was returned." 199 ) 200 201 # Generate the run and output 202 run = self.generate_run( 203 input, input_source, parsed_output, usage, run_output.trace 204 ) 205 206 # Save the run if configured to do so, and we have a path to save to 207 if ( 208 self.base_adapter_config.allow_saving 209 and Config.shared().autosave_runs 210 and self.task.path is not None 211 ): 212 run.save_to_file() 213 else: 214 # Clear the ID to indicate it's not persisted 215 run.id = None 216 217 return run, run_output 218 219 async def invoke_returning_run_output( 220 self, 221 input: InputType, 222 input_source: DataSource | None = None, 223 ) -> Tuple[TaskRun, RunOutput]: 224 # Determine if this is the root agent (no existing run context) 225 is_root_agent = get_agent_run_id() is None 226 227 if is_root_agent: 228 run_id = generate_agent_run_id() 229 set_agent_run_id(run_id) 230 231 try: 232 return await self._run_returning_run_output(input, input_source) 233 finally: 234 if is_root_agent: 235 try: 236 run_id = get_agent_run_id() 237 if run_id: 238 await MCPSessionManager.shared().cleanup_session(run_id) 239 finally: 240 clear_agent_run_id() 241 242 def has_structured_output(self) -> bool: 243 return self.output_schema is not None 244 245 @abstractmethod 246 def adapter_name(self) -> str: 247 pass 248 249 @abstractmethod 250 async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]: 251 pass 252 253 def build_prompt(self) -> str: 254 if self.prompt_builder is None: 255 raise ValueError("Prompt builder is not available for MCP run config") 256 # The prompt builder needs to know if we want to inject formatting instructions 257 structured_output_mode = as_kiln_agent_run_config( 258 self.run_config 259 ).structured_output_mode 260 add_json_instructions = self.has_structured_output() and ( 261 structured_output_mode == StructuredOutputMode.json_instructions 262 or structured_output_mode 263 == StructuredOutputMode.json_instruction_and_object 264 ) 265 266 return self.prompt_builder.build_prompt( 267 include_json_instructions=add_json_instructions 268 ) 269 270 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 271 if self.prompt_builder is None: 272 raise ValueError("Prompt builder is not available for MCP run config") 273 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 274 275 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 276 system_message = self.build_prompt() 277 278 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 279 if not cot_prompt: 280 return get_chat_formatter( 281 strategy=ChatStrategy.single_turn, 282 system_message=system_message, 283 user_input=input, 284 ) 285 286 # Some models like finetunes are trained with a specific chat strategy. Use that. 287 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 288 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 289 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 290 return get_chat_formatter( 291 strategy=tuned_chat_strategy, 292 system_message=system_message, 293 user_input=input, 294 thinking_instructions=cot_prompt, 295 ) 296 297 # Pick the best chat strategy for the model given it has a cot prompt. 298 reasoning_capable = self.model_provider().reasoning_capable 299 if reasoning_capable: 300 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 301 # A simple message with the COT prompt appended to the message list is sufficient 302 return get_chat_formatter( 303 strategy=ChatStrategy.single_turn_r1_thinking, 304 system_message=system_message, 305 user_input=input, 306 thinking_instructions=cot_prompt, 307 ) 308 else: 309 # Unstructured output with COT 310 # Two calls to separate the thinking from the final response 311 return get_chat_formatter( 312 strategy=ChatStrategy.two_message_cot, 313 system_message=system_message, 314 user_input=input, 315 thinking_instructions=cot_prompt, 316 ) 317 318 # create a run and task output 319 def generate_run( 320 self, 321 input: InputType, 322 input_source: DataSource | None, 323 run_output: RunOutput, 324 usage: Usage | None = None, 325 trace: list[ChatCompletionMessageParam] | None = None, 326 ) -> TaskRun: 327 # Convert input and output to JSON strings if they aren't strings 328 input_str = ( 329 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 330 ) 331 output_str = ( 332 json.dumps(run_output.output, ensure_ascii=False) 333 if isinstance(run_output.output, dict) 334 else run_output.output 335 ) 336 337 # If no input source is provided, use the human data source 338 if input_source is None: 339 input_source = DataSource( 340 type=DataSourceType.human, 341 properties={"created_by": Config.shared().user_id}, 342 ) 343 344 # Synthetic since an adapter, not a human, is creating this 345 # Special case for MCP run configs which calls a mcp tool 346 output_source_type = ( 347 DataSourceType.tool_call 348 if self.run_config.type == "mcp" 349 else DataSourceType.synthetic 350 ) 351 352 new_task_run = TaskRun( 353 parent=self.task, 354 input=input_str, 355 input_source=input_source, 356 output=TaskOutput( 357 output=output_str, 358 source=DataSource( 359 type=output_source_type, 360 properties=self._properties_for_task_output(), 361 run_config=self.run_config, 362 ), 363 ), 364 intermediate_outputs=run_output.intermediate_outputs, 365 tags=self.base_adapter_config.default_tags or [], 366 usage=usage, 367 trace=trace, 368 ) 369 370 return new_task_run 371 372 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 373 match self.run_config.type: 374 case "mcp": 375 return {} 376 case "kiln_agent": 377 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 378 raise ValueError("Kiln agent run config is required") 379 run_config = self.run_config 380 381 props: Dict[str, str | int | float] = {} 382 props["adapter_name"] = self.adapter_name() 383 # Legacy properties where we save the run_config details into custom properties. 384 # These are now also be saved in the run_config field. 385 props["model_name"] = run_config.model_name 386 props["model_provider"] = run_config.model_provider_name 387 props["prompt_id"] = run_config.prompt_id 388 props["structured_output_mode"] = run_config.structured_output_mode 389 props["temperature"] = run_config.temperature 390 props["top_p"] = run_config.top_p 391 392 return props 393 case _: 394 raise_exhaustive_enum_error(self.run_config.type) 395 396 def update_run_config_unknown_structured_output_mode(self) -> None: 397 if self.run_config.type != "kiln_agent": 398 return 399 run_config = as_kiln_agent_run_config(self.run_config) 400 structured_output_mode = run_config.structured_output_mode 401 402 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 403 # Look up our recommended mode from ml_model_list if we have one 404 if structured_output_mode == StructuredOutputMode.unknown: 405 new_run_config = run_config.model_copy(deep=True) 406 structured_output_mode = default_structured_output_mode_for_model_provider( 407 run_config.model_name, 408 run_config.model_provider_name, 409 ) 410 new_run_config.structured_output_mode = structured_output_mode 411 self.run_config = new_run_config 412 413 async def available_tools(self) -> list[KilnToolInterface]: 414 if self.run_config.type != "kiln_agent": 415 return [] 416 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 417 if tool_config is None or tool_config.tools is None: 418 return [] 419 420 project = self.task.parent_project() 421 if project is None: 422 raise ValueError("Task must have a parent project to resolve tools") 423 424 project_id = project.id 425 if project_id is None: 426 raise ValueError("Project must have an ID to resolve tools") 427 428 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 429 430 # Check each tool has a unique name 431 tool_names = [await tool.name() for tool in tools] 432 if len(tool_names) != len(set(tool_names)): 433 raise ValueError( 434 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 435 ) 436 437 return tools
@dataclass
class
AdapterConfig:
50@dataclass 51class AdapterConfig: 52 """ 53 An adapter config is config options that do NOT impact the output of the model. 54 55 For example: if it's saved, of if we request additional data like logprobs. 56 """ 57 58 allow_saving: bool = True 59 top_logprobs: int | None = None 60 default_tags: list[str] | None = None 61 62 """ 63 A custom prompt builder can be injected to override the system prompt building process. 64 If not provided, the prompt builder will be created from the run_config.prompt_id which 65 may load additional files from disk. 66 """ 67 prompt_builder: BasePromptBuilder | None = None
An adapter config is config options that do NOT impact the output of the model.
For example: if it's saved, of if we request additional data like logprobs.
AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None, prompt_builder: kiln_ai.adapters.prompt_builders.BasePromptBuilder | None = None)
class
BaseAdapter:
70class BaseAdapter(metaclass=ABCMeta): 71 """Base class for AI model adapters that handle task execution. 72 73 This abstract class provides the foundation for implementing model-specific adapters 74 that can process tasks with structured or unstructured inputs/outputs. It handles 75 input/output validation, prompt building, and run tracking. 76 77 Prompt building is handled internally by the adapter, which uses a prompt builder 78 based on the run config. To override the prompt building behavior, pass a custom prompt 79 builder to the adapter config. 80 """ 81 82 def __init__( 83 self, 84 task: Task, 85 run_config: RunConfigProperties, 86 config: AdapterConfig | None = None, 87 ): 88 self.task = task 89 self.run_config: RunConfigProperties = run_config 90 self.base_adapter_config = config or AdapterConfig() 91 92 if isinstance(run_config, KilnAgentRunConfigProperties): 93 self.update_run_config_unknown_structured_output_mode() 94 self.prompt_builder = ( 95 self.base_adapter_config.prompt_builder 96 or prompt_builder_from_id(run_config.prompt_id, task) 97 ) 98 else: 99 self.prompt_builder = None 100 self._model_provider: KilnModelProvider | None = None 101 102 self.output_schema = task.output_json_schema 103 self.input_schema = task.input_json_schema 104 105 def model_provider(self) -> KilnModelProvider: 106 """ 107 Lazy load the model provider for this adapter. 108 """ 109 if self._model_provider is not None: 110 return self._model_provider 111 run_config = as_kiln_agent_run_config(self.run_config) 112 if not run_config.model_name or not run_config.model_provider_name: 113 raise ValueError("model_name and model_provider_name must be provided") 114 self._model_provider = kiln_model_provider_from( 115 run_config.model_name, run_config.model_provider_name 116 ) 117 if not self._model_provider: 118 raise ValueError( 119 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 120 ) 121 return self._model_provider 122 123 async def invoke( 124 self, 125 input: InputType, 126 input_source: DataSource | None = None, 127 ) -> TaskRun: 128 run_output, _ = await self.invoke_returning_run_output(input, input_source) 129 return run_output 130 131 async def _run_returning_run_output( 132 self, 133 input: InputType, 134 input_source: DataSource | None = None, 135 ) -> Tuple[TaskRun, RunOutput]: 136 # validate input, allowing arrays 137 if self.input_schema is not None: 138 validate_schema_with_value_error( 139 input, 140 self.input_schema, 141 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 142 require_object=False, 143 ) 144 145 # Format model input for model call (we save the original input in the task without formatting) 146 formatted_input = input 147 formatter_id = self.model_provider().formatter 148 if formatter_id is not None: 149 formatter = request_formatter_from_id(formatter_id) 150 formatted_input = formatter.format_input(input) 151 152 # Run 153 run_output, usage = await self._run(formatted_input) 154 155 # Parse 156 provider = self.model_provider() 157 parser = model_parser_from_id(provider.parser) 158 parsed_output = parser.parse_output(original_output=run_output) 159 160 # validate output 161 if self.output_schema is not None: 162 # Parse json to dict if we have structured output 163 if isinstance(parsed_output.output, str): 164 parsed_output.output = parse_json_string(parsed_output.output) 165 166 if not isinstance(parsed_output.output, dict): 167 raise RuntimeError( 168 f"structured response is not a dict: {parsed_output.output}" 169 ) 170 validate_schema_with_value_error( 171 parsed_output.output, 172 self.output_schema, 173 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 174 ) 175 else: 176 if not isinstance(parsed_output.output, str): 177 raise RuntimeError( 178 f"response is not a string for non-structured task: {parsed_output.output}" 179 ) 180 181 # Validate reasoning content is present and required 182 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 183 trace_has_toolcalls = parsed_output.trace is not None and any( 184 message.get("role", None) == "tool" for message in parsed_output.trace 185 ) 186 if ( 187 provider.reasoning_capable 188 and ( 189 not parsed_output.intermediate_outputs 190 or "reasoning" not in parsed_output.intermediate_outputs 191 ) 192 and not ( 193 provider.reasoning_optional_for_structured_output 194 and self.has_structured_output() 195 ) 196 and not (trace_has_toolcalls) 197 ): 198 raise RuntimeError( 199 "Reasoning is required for this model, but no reasoning was returned." 200 ) 201 202 # Generate the run and output 203 run = self.generate_run( 204 input, input_source, parsed_output, usage, run_output.trace 205 ) 206 207 # Save the run if configured to do so, and we have a path to save to 208 if ( 209 self.base_adapter_config.allow_saving 210 and Config.shared().autosave_runs 211 and self.task.path is not None 212 ): 213 run.save_to_file() 214 else: 215 # Clear the ID to indicate it's not persisted 216 run.id = None 217 218 return run, run_output 219 220 async def invoke_returning_run_output( 221 self, 222 input: InputType, 223 input_source: DataSource | None = None, 224 ) -> Tuple[TaskRun, RunOutput]: 225 # Determine if this is the root agent (no existing run context) 226 is_root_agent = get_agent_run_id() is None 227 228 if is_root_agent: 229 run_id = generate_agent_run_id() 230 set_agent_run_id(run_id) 231 232 try: 233 return await self._run_returning_run_output(input, input_source) 234 finally: 235 if is_root_agent: 236 try: 237 run_id = get_agent_run_id() 238 if run_id: 239 await MCPSessionManager.shared().cleanup_session(run_id) 240 finally: 241 clear_agent_run_id() 242 243 def has_structured_output(self) -> bool: 244 return self.output_schema is not None 245 246 @abstractmethod 247 def adapter_name(self) -> str: 248 pass 249 250 @abstractmethod 251 async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]: 252 pass 253 254 def build_prompt(self) -> str: 255 if self.prompt_builder is None: 256 raise ValueError("Prompt builder is not available for MCP run config") 257 # The prompt builder needs to know if we want to inject formatting instructions 258 structured_output_mode = as_kiln_agent_run_config( 259 self.run_config 260 ).structured_output_mode 261 add_json_instructions = self.has_structured_output() and ( 262 structured_output_mode == StructuredOutputMode.json_instructions 263 or structured_output_mode 264 == StructuredOutputMode.json_instruction_and_object 265 ) 266 267 return self.prompt_builder.build_prompt( 268 include_json_instructions=add_json_instructions 269 ) 270 271 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 272 if self.prompt_builder is None: 273 raise ValueError("Prompt builder is not available for MCP run config") 274 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 275 276 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 277 system_message = self.build_prompt() 278 279 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 280 if not cot_prompt: 281 return get_chat_formatter( 282 strategy=ChatStrategy.single_turn, 283 system_message=system_message, 284 user_input=input, 285 ) 286 287 # Some models like finetunes are trained with a specific chat strategy. Use that. 288 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 289 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 290 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 291 return get_chat_formatter( 292 strategy=tuned_chat_strategy, 293 system_message=system_message, 294 user_input=input, 295 thinking_instructions=cot_prompt, 296 ) 297 298 # Pick the best chat strategy for the model given it has a cot prompt. 299 reasoning_capable = self.model_provider().reasoning_capable 300 if reasoning_capable: 301 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 302 # A simple message with the COT prompt appended to the message list is sufficient 303 return get_chat_formatter( 304 strategy=ChatStrategy.single_turn_r1_thinking, 305 system_message=system_message, 306 user_input=input, 307 thinking_instructions=cot_prompt, 308 ) 309 else: 310 # Unstructured output with COT 311 # Two calls to separate the thinking from the final response 312 return get_chat_formatter( 313 strategy=ChatStrategy.two_message_cot, 314 system_message=system_message, 315 user_input=input, 316 thinking_instructions=cot_prompt, 317 ) 318 319 # create a run and task output 320 def generate_run( 321 self, 322 input: InputType, 323 input_source: DataSource | None, 324 run_output: RunOutput, 325 usage: Usage | None = None, 326 trace: list[ChatCompletionMessageParam] | None = None, 327 ) -> TaskRun: 328 # Convert input and output to JSON strings if they aren't strings 329 input_str = ( 330 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 331 ) 332 output_str = ( 333 json.dumps(run_output.output, ensure_ascii=False) 334 if isinstance(run_output.output, dict) 335 else run_output.output 336 ) 337 338 # If no input source is provided, use the human data source 339 if input_source is None: 340 input_source = DataSource( 341 type=DataSourceType.human, 342 properties={"created_by": Config.shared().user_id}, 343 ) 344 345 # Synthetic since an adapter, not a human, is creating this 346 # Special case for MCP run configs which calls a mcp tool 347 output_source_type = ( 348 DataSourceType.tool_call 349 if self.run_config.type == "mcp" 350 else DataSourceType.synthetic 351 ) 352 353 new_task_run = TaskRun( 354 parent=self.task, 355 input=input_str, 356 input_source=input_source, 357 output=TaskOutput( 358 output=output_str, 359 source=DataSource( 360 type=output_source_type, 361 properties=self._properties_for_task_output(), 362 run_config=self.run_config, 363 ), 364 ), 365 intermediate_outputs=run_output.intermediate_outputs, 366 tags=self.base_adapter_config.default_tags or [], 367 usage=usage, 368 trace=trace, 369 ) 370 371 return new_task_run 372 373 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 374 match self.run_config.type: 375 case "mcp": 376 return {} 377 case "kiln_agent": 378 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 379 raise ValueError("Kiln agent run config is required") 380 run_config = self.run_config 381 382 props: Dict[str, str | int | float] = {} 383 props["adapter_name"] = self.adapter_name() 384 # Legacy properties where we save the run_config details into custom properties. 385 # These are now also be saved in the run_config field. 386 props["model_name"] = run_config.model_name 387 props["model_provider"] = run_config.model_provider_name 388 props["prompt_id"] = run_config.prompt_id 389 props["structured_output_mode"] = run_config.structured_output_mode 390 props["temperature"] = run_config.temperature 391 props["top_p"] = run_config.top_p 392 393 return props 394 case _: 395 raise_exhaustive_enum_error(self.run_config.type) 396 397 def update_run_config_unknown_structured_output_mode(self) -> None: 398 if self.run_config.type != "kiln_agent": 399 return 400 run_config = as_kiln_agent_run_config(self.run_config) 401 structured_output_mode = run_config.structured_output_mode 402 403 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 404 # Look up our recommended mode from ml_model_list if we have one 405 if structured_output_mode == StructuredOutputMode.unknown: 406 new_run_config = run_config.model_copy(deep=True) 407 structured_output_mode = default_structured_output_mode_for_model_provider( 408 run_config.model_name, 409 run_config.model_provider_name, 410 ) 411 new_run_config.structured_output_mode = structured_output_mode 412 self.run_config = new_run_config 413 414 async def available_tools(self) -> list[KilnToolInterface]: 415 if self.run_config.type != "kiln_agent": 416 return [] 417 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 418 if tool_config is None or tool_config.tools is None: 419 return [] 420 421 project = self.task.parent_project() 422 if project is None: 423 raise ValueError("Task must have a parent project to resolve tools") 424 425 project_id = project.id 426 if project_id is None: 427 raise ValueError("Project must have an ID to resolve tools") 428 429 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 430 431 # Check each tool has a unique name 432 tool_names = [await tool.name() for tool in tools] 433 if len(tool_names) != len(set(tool_names)): 434 raise ValueError( 435 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 436 ) 437 438 return tools
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.
run_config: Annotated[Union[Annotated[kiln_ai.datamodel.run_config.KilnAgentRunConfigProperties, Tag(tag='kiln_agent')], Annotated[kiln_ai.datamodel.run_config.McpRunConfigProperties, Tag(tag='mcp')]], Discriminator(discriminator=<function _get_run_config_type at 0x7f5e59c4de40>, custom_error_type=None, custom_error_message=None, custom_error_context=None)]
105 def model_provider(self) -> KilnModelProvider: 106 """ 107 Lazy load the model provider for this adapter. 108 """ 109 if self._model_provider is not None: 110 return self._model_provider 111 run_config = as_kiln_agent_run_config(self.run_config) 112 if not run_config.model_name or not run_config.model_provider_name: 113 raise ValueError("model_name and model_provider_name must be provided") 114 self._model_provider = kiln_model_provider_from( 115 run_config.model_name, run_config.model_provider_name 116 ) 117 if not self._model_provider: 118 raise ValueError( 119 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 120 ) 121 return self._model_provider
Lazy load the model provider for this adapter.
async def
invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
async def
invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
220 async def invoke_returning_run_output( 221 self, 222 input: InputType, 223 input_source: DataSource | None = None, 224 ) -> Tuple[TaskRun, RunOutput]: 225 # Determine if this is the root agent (no existing run context) 226 is_root_agent = get_agent_run_id() is None 227 228 if is_root_agent: 229 run_id = generate_agent_run_id() 230 set_agent_run_id(run_id) 231 232 try: 233 return await self._run_returning_run_output(input, input_source) 234 finally: 235 if is_root_agent: 236 try: 237 run_id = get_agent_run_id() 238 if run_id: 239 await MCPSessionManager.shared().cleanup_session(run_id) 240 finally: 241 clear_agent_run_id()
def
build_prompt(self) -> str:
254 def build_prompt(self) -> str: 255 if self.prompt_builder is None: 256 raise ValueError("Prompt builder is not available for MCP run config") 257 # The prompt builder needs to know if we want to inject formatting instructions 258 structured_output_mode = as_kiln_agent_run_config( 259 self.run_config 260 ).structured_output_mode 261 add_json_instructions = self.has_structured_output() and ( 262 structured_output_mode == StructuredOutputMode.json_instructions 263 or structured_output_mode 264 == StructuredOutputMode.json_instruction_and_object 265 ) 266 267 return self.prompt_builder.build_prompt( 268 include_json_instructions=add_json_instructions 269 )
def
build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str]) -> kiln_ai.adapters.chat.ChatFormatter:
271 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 272 if self.prompt_builder is None: 273 raise ValueError("Prompt builder is not available for MCP run config") 274 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 275 276 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 277 system_message = self.build_prompt() 278 279 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 280 if not cot_prompt: 281 return get_chat_formatter( 282 strategy=ChatStrategy.single_turn, 283 system_message=system_message, 284 user_input=input, 285 ) 286 287 # Some models like finetunes are trained with a specific chat strategy. Use that. 288 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 289 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 290 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 291 return get_chat_formatter( 292 strategy=tuned_chat_strategy, 293 system_message=system_message, 294 user_input=input, 295 thinking_instructions=cot_prompt, 296 ) 297 298 # Pick the best chat strategy for the model given it has a cot prompt. 299 reasoning_capable = self.model_provider().reasoning_capable 300 if reasoning_capable: 301 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 302 # A simple message with the COT prompt appended to the message list is sufficient 303 return get_chat_formatter( 304 strategy=ChatStrategy.single_turn_r1_thinking, 305 system_message=system_message, 306 user_input=input, 307 thinking_instructions=cot_prompt, 308 ) 309 else: 310 # Unstructured output with COT 311 # Two calls to separate the thinking from the final response 312 return get_chat_formatter( 313 strategy=ChatStrategy.two_message_cot, 314 system_message=system_message, 315 user_input=input, 316 thinking_instructions=cot_prompt, 317 )
def
generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
320 def generate_run( 321 self, 322 input: InputType, 323 input_source: DataSource | None, 324 run_output: RunOutput, 325 usage: Usage | None = None, 326 trace: list[ChatCompletionMessageParam] | None = None, 327 ) -> TaskRun: 328 # Convert input and output to JSON strings if they aren't strings 329 input_str = ( 330 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 331 ) 332 output_str = ( 333 json.dumps(run_output.output, ensure_ascii=False) 334 if isinstance(run_output.output, dict) 335 else run_output.output 336 ) 337 338 # If no input source is provided, use the human data source 339 if input_source is None: 340 input_source = DataSource( 341 type=DataSourceType.human, 342 properties={"created_by": Config.shared().user_id}, 343 ) 344 345 # Synthetic since an adapter, not a human, is creating this 346 # Special case for MCP run configs which calls a mcp tool 347 output_source_type = ( 348 DataSourceType.tool_call 349 if self.run_config.type == "mcp" 350 else DataSourceType.synthetic 351 ) 352 353 new_task_run = TaskRun( 354 parent=self.task, 355 input=input_str, 356 input_source=input_source, 357 output=TaskOutput( 358 output=output_str, 359 source=DataSource( 360 type=output_source_type, 361 properties=self._properties_for_task_output(), 362 run_config=self.run_config, 363 ), 364 ), 365 intermediate_outputs=run_output.intermediate_outputs, 366 tags=self.base_adapter_config.default_tags or [], 367 usage=usage, 368 trace=trace, 369 ) 370 371 return new_task_run
def
update_run_config_unknown_structured_output_mode(self) -> None:
397 def update_run_config_unknown_structured_output_mode(self) -> None: 398 if self.run_config.type != "kiln_agent": 399 return 400 run_config = as_kiln_agent_run_config(self.run_config) 401 structured_output_mode = run_config.structured_output_mode 402 403 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 404 # Look up our recommended mode from ml_model_list if we have one 405 if structured_output_mode == StructuredOutputMode.unknown: 406 new_run_config = run_config.model_copy(deep=True) 407 structured_output_mode = default_structured_output_mode_for_model_provider( 408 run_config.model_name, 409 run_config.model_provider_name, 410 ) 411 new_run_config.structured_output_mode = structured_output_mode 412 self.run_config = new_run_config
414 async def available_tools(self) -> list[KilnToolInterface]: 415 if self.run_config.type != "kiln_agent": 416 return [] 417 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 418 if tool_config is None or tool_config.tools is None: 419 return [] 420 421 project = self.task.parent_project() 422 if project is None: 423 raise ValueError("Task must have a parent project to resolve tools") 424 425 project_id = project.id 426 if project_id is None: 427 raise ValueError("Project must have an ID to resolve tools") 428 429 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 430 431 # Check each tool has a unique name 432 tool_names = [await tool.name() for tool in tools] 433 if len(tool_names) != len(set(tool_names)): 434 raise ValueError( 435 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 436 ) 437 438 return tools