kiln_ai.adapters.model_adapters.base_adapter
1import json 2from abc import ABCMeta, abstractmethod 3from dataclasses import dataclass 4from typing import Dict, Tuple 5 6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter 7from kiln_ai.adapters.ml_model_list import ( 8 KilnModelProvider, 9 StructuredOutputMode, 10 default_structured_output_mode_for_model_provider, 11) 12from kiln_ai.adapters.parsers.json_parser import parse_json_string 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id 15from kiln_ai.adapters.prompt_builders import prompt_builder_from_id 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from 17from kiln_ai.adapters.run_output import RunOutput 18from kiln_ai.datamodel import ( 19 DataSource, 20 DataSourceType, 21 Task, 22 TaskOutput, 23 TaskRun, 24 Usage, 25) 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error 28from kiln_ai.datamodel.task import RunConfigProperties 29from kiln_ai.tools import KilnToolInterface 30from kiln_ai.tools.tool_registry import tool_from_id 31from kiln_ai.utils.config import Config 32from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam 33 34 35@dataclass 36class AdapterConfig: 37 """ 38 An adapter config is config options that do NOT impact the output of the model. 39 40 For example: if it's saved, of if we request additional data like logprobs. 41 """ 42 43 allow_saving: bool = True 44 top_logprobs: int | None = None 45 default_tags: list[str] | None = None 46 47 48class BaseAdapter(metaclass=ABCMeta): 49 """Base class for AI model adapters that handle task execution. 50 51 This abstract class provides the foundation for implementing model-specific adapters 52 that can process tasks with structured or unstructured inputs/outputs. It handles 53 input/output validation, prompt building, and run tracking. 54 """ 55 56 def __init__( 57 self, 58 task: Task, 59 run_config: RunConfigProperties, 60 config: AdapterConfig | None = None, 61 ): 62 self.task = task 63 self.run_config = run_config 64 self.update_run_config_unknown_structured_output_mode() 65 self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task) 66 self._model_provider: KilnModelProvider | None = None 67 68 self.output_schema = task.output_json_schema 69 self.input_schema = task.input_json_schema 70 self.base_adapter_config = config or AdapterConfig() 71 72 def model_provider(self) -> KilnModelProvider: 73 """ 74 Lazy load the model provider for this adapter. 75 """ 76 if self._model_provider is not None: 77 return self._model_provider 78 if not self.run_config.model_name or not self.run_config.model_provider_name: 79 raise ValueError("model_name and model_provider_name must be provided") 80 self._model_provider = kiln_model_provider_from( 81 self.run_config.model_name, self.run_config.model_provider_name 82 ) 83 if not self._model_provider: 84 raise ValueError( 85 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 86 ) 87 return self._model_provider 88 89 async def invoke( 90 self, 91 input: InputType, 92 input_source: DataSource | None = None, 93 ) -> TaskRun: 94 run_output, _ = await self.invoke_returning_run_output(input, input_source) 95 return run_output 96 97 async def invoke_returning_run_output( 98 self, 99 input: InputType, 100 input_source: DataSource | None = None, 101 ) -> Tuple[TaskRun, RunOutput]: 102 # validate input, allowing arrays 103 if self.input_schema is not None: 104 validate_schema_with_value_error( 105 input, 106 self.input_schema, 107 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 108 require_object=False, 109 ) 110 111 # Format model input for model call (we save the original input in the task without formatting) 112 formatted_input = input 113 formatter_id = self.model_provider().formatter 114 if formatter_id is not None: 115 formatter = request_formatter_from_id(formatter_id) 116 formatted_input = formatter.format_input(input) 117 118 # Run 119 run_output, usage = await self._run(formatted_input) 120 121 # Parse 122 provider = self.model_provider() 123 parser = model_parser_from_id(provider.parser) 124 parsed_output = parser.parse_output(original_output=run_output) 125 126 # validate output 127 if self.output_schema is not None: 128 # Parse json to dict if we have structured output 129 if isinstance(parsed_output.output, str): 130 parsed_output.output = parse_json_string(parsed_output.output) 131 132 if not isinstance(parsed_output.output, dict): 133 raise RuntimeError( 134 f"structured response is not a dict: {parsed_output.output}" 135 ) 136 validate_schema_with_value_error( 137 parsed_output.output, 138 self.output_schema, 139 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 140 ) 141 else: 142 if not isinstance(parsed_output.output, str): 143 raise RuntimeError( 144 f"response is not a string for non-structured task: {parsed_output.output}" 145 ) 146 147 # Validate reasoning content is present and required 148 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 149 trace_has_toolcalls = parsed_output.trace is not None and any( 150 message.get("role", None) == "tool" for message in parsed_output.trace 151 ) 152 if ( 153 provider.reasoning_capable 154 and ( 155 not parsed_output.intermediate_outputs 156 or "reasoning" not in parsed_output.intermediate_outputs 157 ) 158 and not ( 159 provider.reasoning_optional_for_structured_output 160 and self.has_structured_output() 161 ) 162 and not (trace_has_toolcalls) 163 ): 164 raise RuntimeError( 165 "Reasoning is required for this model, but no reasoning was returned." 166 ) 167 168 # Generate the run and output 169 run = self.generate_run( 170 input, input_source, parsed_output, usage, run_output.trace 171 ) 172 173 # Save the run if configured to do so, and we have a path to save to 174 if ( 175 self.base_adapter_config.allow_saving 176 and Config.shared().autosave_runs 177 and self.task.path is not None 178 ): 179 run.save_to_file() 180 else: 181 # Clear the ID to indicate it's not persisted 182 run.id = None 183 184 return run, run_output 185 186 def has_structured_output(self) -> bool: 187 return self.output_schema is not None 188 189 @abstractmethod 190 def adapter_name(self) -> str: 191 pass 192 193 @abstractmethod 194 async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]: 195 pass 196 197 def build_prompt(self) -> str: 198 # The prompt builder needs to know if we want to inject formatting instructions 199 structured_output_mode = self.run_config.structured_output_mode 200 add_json_instructions = self.has_structured_output() and ( 201 structured_output_mode == StructuredOutputMode.json_instructions 202 or structured_output_mode 203 == StructuredOutputMode.json_instruction_and_object 204 ) 205 206 return self.prompt_builder.build_prompt( 207 include_json_instructions=add_json_instructions 208 ) 209 210 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 211 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 212 213 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 214 system_message = self.build_prompt() 215 216 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 217 if not cot_prompt: 218 return get_chat_formatter( 219 strategy=ChatStrategy.single_turn, 220 system_message=system_message, 221 user_input=input, 222 ) 223 224 # Some models like finetunes are trained with a specific chat strategy. Use that. 225 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 226 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 227 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 228 return get_chat_formatter( 229 strategy=tuned_chat_strategy, 230 system_message=system_message, 231 user_input=input, 232 thinking_instructions=cot_prompt, 233 ) 234 235 # Pick the best chat strategy for the model given it has a cot prompt. 236 reasoning_capable = self.model_provider().reasoning_capable 237 if reasoning_capable: 238 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 239 # A simple message with the COT prompt appended to the message list is sufficient 240 return get_chat_formatter( 241 strategy=ChatStrategy.single_turn_r1_thinking, 242 system_message=system_message, 243 user_input=input, 244 thinking_instructions=cot_prompt, 245 ) 246 else: 247 # Unstructured output with COT 248 # Two calls to separate the thinking from the final response 249 return get_chat_formatter( 250 strategy=ChatStrategy.two_message_cot, 251 system_message=system_message, 252 user_input=input, 253 thinking_instructions=cot_prompt, 254 ) 255 256 # create a run and task output 257 def generate_run( 258 self, 259 input: InputType, 260 input_source: DataSource | None, 261 run_output: RunOutput, 262 usage: Usage | None = None, 263 trace: list[ChatCompletionMessageParam] | None = None, 264 ) -> TaskRun: 265 # Convert input and output to JSON strings if they aren't strings 266 input_str = ( 267 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 268 ) 269 output_str = ( 270 json.dumps(run_output.output, ensure_ascii=False) 271 if isinstance(run_output.output, dict) 272 else run_output.output 273 ) 274 275 # If no input source is provided, use the human data source 276 if input_source is None: 277 input_source = DataSource( 278 type=DataSourceType.human, 279 properties={"created_by": Config.shared().user_id}, 280 ) 281 282 new_task_run = TaskRun( 283 parent=self.task, 284 input=input_str, 285 input_source=input_source, 286 output=TaskOutput( 287 output=output_str, 288 # Synthetic since an adapter, not a human, is creating this 289 source=DataSource( 290 type=DataSourceType.synthetic, 291 properties=self._properties_for_task_output(), 292 run_config=self.run_config, 293 ), 294 ), 295 intermediate_outputs=run_output.intermediate_outputs, 296 tags=self.base_adapter_config.default_tags or [], 297 usage=usage, 298 trace=trace, 299 ) 300 301 return new_task_run 302 303 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 304 props = {} 305 306 props["adapter_name"] = self.adapter_name() 307 308 # Legacy properties where we save the run_config details into custom properties. 309 # These are now also be saved in the run_config field. 310 props["model_name"] = self.run_config.model_name 311 props["model_provider"] = self.run_config.model_provider_name 312 props["prompt_id"] = self.run_config.prompt_id 313 props["structured_output_mode"] = self.run_config.structured_output_mode 314 props["temperature"] = self.run_config.temperature 315 props["top_p"] = self.run_config.top_p 316 317 return props 318 319 def update_run_config_unknown_structured_output_mode(self) -> None: 320 structured_output_mode = self.run_config.structured_output_mode 321 322 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 323 # Look up our recommended mode from ml_model_list if we have one 324 if structured_output_mode == StructuredOutputMode.unknown: 325 new_run_config = self.run_config.model_copy(deep=True) 326 structured_output_mode = default_structured_output_mode_for_model_provider( 327 self.run_config.model_name, 328 self.run_config.model_provider_name, 329 ) 330 new_run_config.structured_output_mode = structured_output_mode 331 self.run_config = new_run_config 332 333 async def available_tools(self) -> list[KilnToolInterface]: 334 tool_config = self.run_config.tools_config 335 if tool_config is None or tool_config.tools is None: 336 return [] 337 338 project = self.task.parent_project() 339 if project is None: 340 raise ValueError("Task must have a parent project to resolve tools") 341 342 project_id = project.id 343 if project_id is None: 344 raise ValueError("Project must have an ID to resolve tools") 345 346 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 347 348 # Check each tool has a unique name 349 tool_names = [await tool.name() for tool in tools] 350 if len(tool_names) != len(set(tool_names)): 351 raise ValueError( 352 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 353 ) 354 355 return tools
@dataclass
class
AdapterConfig:
36@dataclass 37class AdapterConfig: 38 """ 39 An adapter config is config options that do NOT impact the output of the model. 40 41 For example: if it's saved, of if we request additional data like logprobs. 42 """ 43 44 allow_saving: bool = True 45 top_logprobs: int | None = None 46 default_tags: list[str] | None = None
An adapter config is config options that do NOT impact the output of the model.
For example: if it's saved, of if we request additional data like logprobs.
class
BaseAdapter:
49class BaseAdapter(metaclass=ABCMeta): 50 """Base class for AI model adapters that handle task execution. 51 52 This abstract class provides the foundation for implementing model-specific adapters 53 that can process tasks with structured or unstructured inputs/outputs. It handles 54 input/output validation, prompt building, and run tracking. 55 """ 56 57 def __init__( 58 self, 59 task: Task, 60 run_config: RunConfigProperties, 61 config: AdapterConfig | None = None, 62 ): 63 self.task = task 64 self.run_config = run_config 65 self.update_run_config_unknown_structured_output_mode() 66 self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task) 67 self._model_provider: KilnModelProvider | None = None 68 69 self.output_schema = task.output_json_schema 70 self.input_schema = task.input_json_schema 71 self.base_adapter_config = config or AdapterConfig() 72 73 def model_provider(self) -> KilnModelProvider: 74 """ 75 Lazy load the model provider for this adapter. 76 """ 77 if self._model_provider is not None: 78 return self._model_provider 79 if not self.run_config.model_name or not self.run_config.model_provider_name: 80 raise ValueError("model_name and model_provider_name must be provided") 81 self._model_provider = kiln_model_provider_from( 82 self.run_config.model_name, self.run_config.model_provider_name 83 ) 84 if not self._model_provider: 85 raise ValueError( 86 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 87 ) 88 return self._model_provider 89 90 async def invoke( 91 self, 92 input: InputType, 93 input_source: DataSource | None = None, 94 ) -> TaskRun: 95 run_output, _ = await self.invoke_returning_run_output(input, input_source) 96 return run_output 97 98 async def invoke_returning_run_output( 99 self, 100 input: InputType, 101 input_source: DataSource | None = None, 102 ) -> Tuple[TaskRun, RunOutput]: 103 # validate input, allowing arrays 104 if self.input_schema is not None: 105 validate_schema_with_value_error( 106 input, 107 self.input_schema, 108 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 109 require_object=False, 110 ) 111 112 # Format model input for model call (we save the original input in the task without formatting) 113 formatted_input = input 114 formatter_id = self.model_provider().formatter 115 if formatter_id is not None: 116 formatter = request_formatter_from_id(formatter_id) 117 formatted_input = formatter.format_input(input) 118 119 # Run 120 run_output, usage = await self._run(formatted_input) 121 122 # Parse 123 provider = self.model_provider() 124 parser = model_parser_from_id(provider.parser) 125 parsed_output = parser.parse_output(original_output=run_output) 126 127 # validate output 128 if self.output_schema is not None: 129 # Parse json to dict if we have structured output 130 if isinstance(parsed_output.output, str): 131 parsed_output.output = parse_json_string(parsed_output.output) 132 133 if not isinstance(parsed_output.output, dict): 134 raise RuntimeError( 135 f"structured response is not a dict: {parsed_output.output}" 136 ) 137 validate_schema_with_value_error( 138 parsed_output.output, 139 self.output_schema, 140 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 141 ) 142 else: 143 if not isinstance(parsed_output.output, str): 144 raise RuntimeError( 145 f"response is not a string for non-structured task: {parsed_output.output}" 146 ) 147 148 # Validate reasoning content is present and required 149 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 150 trace_has_toolcalls = parsed_output.trace is not None and any( 151 message.get("role", None) == "tool" for message in parsed_output.trace 152 ) 153 if ( 154 provider.reasoning_capable 155 and ( 156 not parsed_output.intermediate_outputs 157 or "reasoning" not in parsed_output.intermediate_outputs 158 ) 159 and not ( 160 provider.reasoning_optional_for_structured_output 161 and self.has_structured_output() 162 ) 163 and not (trace_has_toolcalls) 164 ): 165 raise RuntimeError( 166 "Reasoning is required for this model, but no reasoning was returned." 167 ) 168 169 # Generate the run and output 170 run = self.generate_run( 171 input, input_source, parsed_output, usage, run_output.trace 172 ) 173 174 # Save the run if configured to do so, and we have a path to save to 175 if ( 176 self.base_adapter_config.allow_saving 177 and Config.shared().autosave_runs 178 and self.task.path is not None 179 ): 180 run.save_to_file() 181 else: 182 # Clear the ID to indicate it's not persisted 183 run.id = None 184 185 return run, run_output 186 187 def has_structured_output(self) -> bool: 188 return self.output_schema is not None 189 190 @abstractmethod 191 def adapter_name(self) -> str: 192 pass 193 194 @abstractmethod 195 async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]: 196 pass 197 198 def build_prompt(self) -> str: 199 # The prompt builder needs to know if we want to inject formatting instructions 200 structured_output_mode = self.run_config.structured_output_mode 201 add_json_instructions = self.has_structured_output() and ( 202 structured_output_mode == StructuredOutputMode.json_instructions 203 or structured_output_mode 204 == StructuredOutputMode.json_instruction_and_object 205 ) 206 207 return self.prompt_builder.build_prompt( 208 include_json_instructions=add_json_instructions 209 ) 210 211 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 212 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 213 214 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 215 system_message = self.build_prompt() 216 217 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 218 if not cot_prompt: 219 return get_chat_formatter( 220 strategy=ChatStrategy.single_turn, 221 system_message=system_message, 222 user_input=input, 223 ) 224 225 # Some models like finetunes are trained with a specific chat strategy. Use that. 226 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 227 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 228 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 229 return get_chat_formatter( 230 strategy=tuned_chat_strategy, 231 system_message=system_message, 232 user_input=input, 233 thinking_instructions=cot_prompt, 234 ) 235 236 # Pick the best chat strategy for the model given it has a cot prompt. 237 reasoning_capable = self.model_provider().reasoning_capable 238 if reasoning_capable: 239 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 240 # A simple message with the COT prompt appended to the message list is sufficient 241 return get_chat_formatter( 242 strategy=ChatStrategy.single_turn_r1_thinking, 243 system_message=system_message, 244 user_input=input, 245 thinking_instructions=cot_prompt, 246 ) 247 else: 248 # Unstructured output with COT 249 # Two calls to separate the thinking from the final response 250 return get_chat_formatter( 251 strategy=ChatStrategy.two_message_cot, 252 system_message=system_message, 253 user_input=input, 254 thinking_instructions=cot_prompt, 255 ) 256 257 # create a run and task output 258 def generate_run( 259 self, 260 input: InputType, 261 input_source: DataSource | None, 262 run_output: RunOutput, 263 usage: Usage | None = None, 264 trace: list[ChatCompletionMessageParam] | None = None, 265 ) -> TaskRun: 266 # Convert input and output to JSON strings if they aren't strings 267 input_str = ( 268 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 269 ) 270 output_str = ( 271 json.dumps(run_output.output, ensure_ascii=False) 272 if isinstance(run_output.output, dict) 273 else run_output.output 274 ) 275 276 # If no input source is provided, use the human data source 277 if input_source is None: 278 input_source = DataSource( 279 type=DataSourceType.human, 280 properties={"created_by": Config.shared().user_id}, 281 ) 282 283 new_task_run = TaskRun( 284 parent=self.task, 285 input=input_str, 286 input_source=input_source, 287 output=TaskOutput( 288 output=output_str, 289 # Synthetic since an adapter, not a human, is creating this 290 source=DataSource( 291 type=DataSourceType.synthetic, 292 properties=self._properties_for_task_output(), 293 run_config=self.run_config, 294 ), 295 ), 296 intermediate_outputs=run_output.intermediate_outputs, 297 tags=self.base_adapter_config.default_tags or [], 298 usage=usage, 299 trace=trace, 300 ) 301 302 return new_task_run 303 304 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 305 props = {} 306 307 props["adapter_name"] = self.adapter_name() 308 309 # Legacy properties where we save the run_config details into custom properties. 310 # These are now also be saved in the run_config field. 311 props["model_name"] = self.run_config.model_name 312 props["model_provider"] = self.run_config.model_provider_name 313 props["prompt_id"] = self.run_config.prompt_id 314 props["structured_output_mode"] = self.run_config.structured_output_mode 315 props["temperature"] = self.run_config.temperature 316 props["top_p"] = self.run_config.top_p 317 318 return props 319 320 def update_run_config_unknown_structured_output_mode(self) -> None: 321 structured_output_mode = self.run_config.structured_output_mode 322 323 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 324 # Look up our recommended mode from ml_model_list if we have one 325 if structured_output_mode == StructuredOutputMode.unknown: 326 new_run_config = self.run_config.model_copy(deep=True) 327 structured_output_mode = default_structured_output_mode_for_model_provider( 328 self.run_config.model_name, 329 self.run_config.model_provider_name, 330 ) 331 new_run_config.structured_output_mode = structured_output_mode 332 self.run_config = new_run_config 333 334 async def available_tools(self) -> list[KilnToolInterface]: 335 tool_config = self.run_config.tools_config 336 if tool_config is None or tool_config.tools is None: 337 return [] 338 339 project = self.task.parent_project() 340 if project is None: 341 raise ValueError("Task must have a parent project to resolve tools") 342 343 project_id = project.id 344 if project_id is None: 345 raise ValueError("Project must have an ID to resolve tools") 346 347 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 348 349 # Check each tool has a unique name 350 tool_names = [await tool.name() for tool in tools] 351 if len(tool_names) != len(set(tool_names)): 352 raise ValueError( 353 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 354 ) 355 356 return tools
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
73 def model_provider(self) -> KilnModelProvider: 74 """ 75 Lazy load the model provider for this adapter. 76 """ 77 if self._model_provider is not None: 78 return self._model_provider 79 if not self.run_config.model_name or not self.run_config.model_provider_name: 80 raise ValueError("model_name and model_provider_name must be provided") 81 self._model_provider = kiln_model_provider_from( 82 self.run_config.model_name, self.run_config.model_provider_name 83 ) 84 if not self._model_provider: 85 raise ValueError( 86 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 87 ) 88 return self._model_provider
Lazy load the model provider for this adapter.
async def
invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
async def
invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
98 async def invoke_returning_run_output( 99 self, 100 input: InputType, 101 input_source: DataSource | None = None, 102 ) -> Tuple[TaskRun, RunOutput]: 103 # validate input, allowing arrays 104 if self.input_schema is not None: 105 validate_schema_with_value_error( 106 input, 107 self.input_schema, 108 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 109 require_object=False, 110 ) 111 112 # Format model input for model call (we save the original input in the task without formatting) 113 formatted_input = input 114 formatter_id = self.model_provider().formatter 115 if formatter_id is not None: 116 formatter = request_formatter_from_id(formatter_id) 117 formatted_input = formatter.format_input(input) 118 119 # Run 120 run_output, usage = await self._run(formatted_input) 121 122 # Parse 123 provider = self.model_provider() 124 parser = model_parser_from_id(provider.parser) 125 parsed_output = parser.parse_output(original_output=run_output) 126 127 # validate output 128 if self.output_schema is not None: 129 # Parse json to dict if we have structured output 130 if isinstance(parsed_output.output, str): 131 parsed_output.output = parse_json_string(parsed_output.output) 132 133 if not isinstance(parsed_output.output, dict): 134 raise RuntimeError( 135 f"structured response is not a dict: {parsed_output.output}" 136 ) 137 validate_schema_with_value_error( 138 parsed_output.output, 139 self.output_schema, 140 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 141 ) 142 else: 143 if not isinstance(parsed_output.output, str): 144 raise RuntimeError( 145 f"response is not a string for non-structured task: {parsed_output.output}" 146 ) 147 148 # Validate reasoning content is present and required 149 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 150 trace_has_toolcalls = parsed_output.trace is not None and any( 151 message.get("role", None) == "tool" for message in parsed_output.trace 152 ) 153 if ( 154 provider.reasoning_capable 155 and ( 156 not parsed_output.intermediate_outputs 157 or "reasoning" not in parsed_output.intermediate_outputs 158 ) 159 and not ( 160 provider.reasoning_optional_for_structured_output 161 and self.has_structured_output() 162 ) 163 and not (trace_has_toolcalls) 164 ): 165 raise RuntimeError( 166 "Reasoning is required for this model, but no reasoning was returned." 167 ) 168 169 # Generate the run and output 170 run = self.generate_run( 171 input, input_source, parsed_output, usage, run_output.trace 172 ) 173 174 # Save the run if configured to do so, and we have a path to save to 175 if ( 176 self.base_adapter_config.allow_saving 177 and Config.shared().autosave_runs 178 and self.task.path is not None 179 ): 180 run.save_to_file() 181 else: 182 # Clear the ID to indicate it's not persisted 183 run.id = None 184 185 return run, run_output
def
build_prompt(self) -> str:
198 def build_prompt(self) -> str: 199 # The prompt builder needs to know if we want to inject formatting instructions 200 structured_output_mode = self.run_config.structured_output_mode 201 add_json_instructions = self.has_structured_output() and ( 202 structured_output_mode == StructuredOutputMode.json_instructions 203 or structured_output_mode 204 == StructuredOutputMode.json_instruction_and_object 205 ) 206 207 return self.prompt_builder.build_prompt( 208 include_json_instructions=add_json_instructions 209 )
def
build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str]) -> kiln_ai.adapters.chat.ChatFormatter:
211 def build_chat_formatter(self, input: InputType) -> ChatFormatter: 212 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 213 214 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 215 system_message = self.build_prompt() 216 217 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 218 if not cot_prompt: 219 return get_chat_formatter( 220 strategy=ChatStrategy.single_turn, 221 system_message=system_message, 222 user_input=input, 223 ) 224 225 # Some models like finetunes are trained with a specific chat strategy. Use that. 226 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 227 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 228 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 229 return get_chat_formatter( 230 strategy=tuned_chat_strategy, 231 system_message=system_message, 232 user_input=input, 233 thinking_instructions=cot_prompt, 234 ) 235 236 # Pick the best chat strategy for the model given it has a cot prompt. 237 reasoning_capable = self.model_provider().reasoning_capable 238 if reasoning_capable: 239 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 240 # A simple message with the COT prompt appended to the message list is sufficient 241 return get_chat_formatter( 242 strategy=ChatStrategy.single_turn_r1_thinking, 243 system_message=system_message, 244 user_input=input, 245 thinking_instructions=cot_prompt, 246 ) 247 else: 248 # Unstructured output with COT 249 # Two calls to separate the thinking from the final response 250 return get_chat_formatter( 251 strategy=ChatStrategy.two_message_cot, 252 system_message=system_message, 253 user_input=input, 254 thinking_instructions=cot_prompt, 255 )
def
generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
258 def generate_run( 259 self, 260 input: InputType, 261 input_source: DataSource | None, 262 run_output: RunOutput, 263 usage: Usage | None = None, 264 trace: list[ChatCompletionMessageParam] | None = None, 265 ) -> TaskRun: 266 # Convert input and output to JSON strings if they aren't strings 267 input_str = ( 268 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 269 ) 270 output_str = ( 271 json.dumps(run_output.output, ensure_ascii=False) 272 if isinstance(run_output.output, dict) 273 else run_output.output 274 ) 275 276 # If no input source is provided, use the human data source 277 if input_source is None: 278 input_source = DataSource( 279 type=DataSourceType.human, 280 properties={"created_by": Config.shared().user_id}, 281 ) 282 283 new_task_run = TaskRun( 284 parent=self.task, 285 input=input_str, 286 input_source=input_source, 287 output=TaskOutput( 288 output=output_str, 289 # Synthetic since an adapter, not a human, is creating this 290 source=DataSource( 291 type=DataSourceType.synthetic, 292 properties=self._properties_for_task_output(), 293 run_config=self.run_config, 294 ), 295 ), 296 intermediate_outputs=run_output.intermediate_outputs, 297 tags=self.base_adapter_config.default_tags or [], 298 usage=usage, 299 trace=trace, 300 ) 301 302 return new_task_run
def
update_run_config_unknown_structured_output_mode(self) -> None:
320 def update_run_config_unknown_structured_output_mode(self) -> None: 321 structured_output_mode = self.run_config.structured_output_mode 322 323 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 324 # Look up our recommended mode from ml_model_list if we have one 325 if structured_output_mode == StructuredOutputMode.unknown: 326 new_run_config = self.run_config.model_copy(deep=True) 327 structured_output_mode = default_structured_output_mode_for_model_provider( 328 self.run_config.model_name, 329 self.run_config.model_provider_name, 330 ) 331 new_run_config.structured_output_mode = structured_output_mode 332 self.run_config = new_run_config
334 async def available_tools(self) -> list[KilnToolInterface]: 335 tool_config = self.run_config.tools_config 336 if tool_config is None or tool_config.tools is None: 337 return [] 338 339 project = self.task.parent_project() 340 if project is None: 341 raise ValueError("Task must have a parent project to resolve tools") 342 343 project_id = project.id 344 if project_id is None: 345 raise ValueError("Project must have an ID to resolve tools") 346 347 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 348 349 # Check each tool has a unique name 350 tool_names = [await tool.name() for tool in tools] 351 if len(tool_names) != len(set(tool_names)): 352 raise ValueError( 353 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 354 ) 355 356 return tools