kiln_ai.adapters.model_adapters.base_adapter
1import json 2from abc import ABCMeta, abstractmethod 3from dataclasses import dataclass 4from typing import Dict, Tuple 5 6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter 7from kiln_ai.adapters.ml_model_list import ( 8 KilnModelProvider, 9 StructuredOutputMode, 10 default_structured_output_mode_for_model_provider, 11) 12from kiln_ai.adapters.parsers.json_parser import parse_json_string 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id 15from kiln_ai.adapters.prompt_builders import prompt_builder_from_id 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from 17from kiln_ai.adapters.run_output import RunOutput 18from kiln_ai.datamodel import ( 19 DataSource, 20 DataSourceType, 21 Task, 22 TaskOutput, 23 TaskRun, 24 Usage, 25) 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error 28from kiln_ai.datamodel.task import RunConfigProperties 29from kiln_ai.tools import KilnToolInterface 30from kiln_ai.tools.tool_registry import tool_from_id 31from kiln_ai.utils.config import Config 32from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam 33 34 35@dataclass 36class AdapterConfig: 37 """ 38 An adapter config is config options that do NOT impact the output of the model. 39 40 For example: if it's saved, of if we request additional data like logprobs. 41 """ 42 43 allow_saving: bool = True 44 top_logprobs: int | None = None 45 default_tags: list[str] | None = None 46 47 48class BaseAdapter(metaclass=ABCMeta): 49 """Base class for AI model adapters that handle task execution. 50 51 This abstract class provides the foundation for implementing model-specific adapters 52 that can process tasks with structured or unstructured inputs/outputs. It handles 53 input/output validation, prompt building, and run tracking. 54 """ 55 56 def __init__( 57 self, 58 task: Task, 59 run_config: RunConfigProperties, 60 config: AdapterConfig | None = None, 61 ): 62 self.task = task 63 self.run_config = run_config 64 self.update_run_config_unknown_structured_output_mode() 65 self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task) 66 self._model_provider: KilnModelProvider | None = None 67 68 self.output_schema = task.output_json_schema 69 self.input_schema = task.input_json_schema 70 self.base_adapter_config = config or AdapterConfig() 71 72 def model_provider(self) -> KilnModelProvider: 73 """ 74 Lazy load the model provider for this adapter. 75 """ 76 if self._model_provider is not None: 77 return self._model_provider 78 if not self.run_config.model_name or not self.run_config.model_provider_name: 79 raise ValueError("model_name and model_provider_name must be provided") 80 self._model_provider = kiln_model_provider_from( 81 self.run_config.model_name, self.run_config.model_provider_name 82 ) 83 if not self._model_provider: 84 raise ValueError( 85 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 86 ) 87 return self._model_provider 88 89 async def invoke( 90 self, 91 input: Dict | str, 92 input_source: DataSource | None = None, 93 ) -> TaskRun: 94 run_output, _ = await self.invoke_returning_run_output(input, input_source) 95 return run_output 96 97 async def invoke_returning_run_output( 98 self, 99 input: Dict | str, 100 input_source: DataSource | None = None, 101 ) -> Tuple[TaskRun, RunOutput]: 102 # validate input 103 if self.input_schema is not None: 104 if not isinstance(input, dict): 105 raise ValueError(f"structured input is not a dict: {input}") 106 107 validate_schema_with_value_error( 108 input, 109 self.input_schema, 110 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 111 ) 112 113 # Format model input for model call (we save the original input in the task without formatting) 114 formatted_input = input 115 formatter_id = self.model_provider().formatter 116 if formatter_id is not None: 117 formatter = request_formatter_from_id(formatter_id) 118 formatted_input = formatter.format_input(input) 119 120 # Run 121 run_output, usage = await self._run(formatted_input) 122 123 # Parse 124 provider = self.model_provider() 125 parser = model_parser_from_id(provider.parser) 126 parsed_output = parser.parse_output(original_output=run_output) 127 128 # validate output 129 if self.output_schema is not None: 130 # Parse json to dict if we have structured output 131 if isinstance(parsed_output.output, str): 132 parsed_output.output = parse_json_string(parsed_output.output) 133 134 if not isinstance(parsed_output.output, dict): 135 raise RuntimeError( 136 f"structured response is not a dict: {parsed_output.output}" 137 ) 138 validate_schema_with_value_error( 139 parsed_output.output, 140 self.output_schema, 141 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 142 ) 143 else: 144 if not isinstance(parsed_output.output, str): 145 raise RuntimeError( 146 f"response is not a string for non-structured task: {parsed_output.output}" 147 ) 148 149 # Validate reasoning content is present and required 150 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 151 trace_has_toolcalls = parsed_output.trace is not None and any( 152 message.get("role", None) == "tool" for message in parsed_output.trace 153 ) 154 if ( 155 provider.reasoning_capable 156 and ( 157 not parsed_output.intermediate_outputs 158 or "reasoning" not in parsed_output.intermediate_outputs 159 ) 160 and not ( 161 provider.reasoning_optional_for_structured_output 162 and self.has_structured_output() 163 ) 164 and not (trace_has_toolcalls) 165 ): 166 raise RuntimeError( 167 "Reasoning is required for this model, but no reasoning was returned." 168 ) 169 170 # Generate the run and output 171 run = self.generate_run( 172 input, input_source, parsed_output, usage, run_output.trace 173 ) 174 175 # Save the run if configured to do so, and we have a path to save to 176 if ( 177 self.base_adapter_config.allow_saving 178 and Config.shared().autosave_runs 179 and self.task.path is not None 180 ): 181 run.save_to_file() 182 else: 183 # Clear the ID to indicate it's not persisted 184 run.id = None 185 186 return run, run_output 187 188 def has_structured_output(self) -> bool: 189 return self.output_schema is not None 190 191 @abstractmethod 192 def adapter_name(self) -> str: 193 pass 194 195 @abstractmethod 196 async def _run(self, input: Dict | str) -> Tuple[RunOutput, Usage | None]: 197 pass 198 199 def build_prompt(self) -> str: 200 # The prompt builder needs to know if we want to inject formatting instructions 201 structured_output_mode = self.run_config.structured_output_mode 202 add_json_instructions = self.has_structured_output() and ( 203 structured_output_mode == StructuredOutputMode.json_instructions 204 or structured_output_mode 205 == StructuredOutputMode.json_instruction_and_object 206 ) 207 208 return self.prompt_builder.build_prompt( 209 include_json_instructions=add_json_instructions 210 ) 211 212 def build_chat_formatter(self, input: Dict | str) -> ChatFormatter: 213 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 214 215 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 216 system_message = self.build_prompt() 217 218 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 219 if not cot_prompt: 220 return get_chat_formatter( 221 strategy=ChatStrategy.single_turn, 222 system_message=system_message, 223 user_input=input, 224 ) 225 226 # Some models like finetunes are trained with a specific chat strategy. Use that. 227 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 228 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 229 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 230 return get_chat_formatter( 231 strategy=tuned_chat_strategy, 232 system_message=system_message, 233 user_input=input, 234 thinking_instructions=cot_prompt, 235 ) 236 237 # Pick the best chat strategy for the model given it has a cot prompt. 238 reasoning_capable = self.model_provider().reasoning_capable 239 if reasoning_capable: 240 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 241 # A simple message with the COT prompt appended to the message list is sufficient 242 return get_chat_formatter( 243 strategy=ChatStrategy.single_turn_r1_thinking, 244 system_message=system_message, 245 user_input=input, 246 thinking_instructions=cot_prompt, 247 ) 248 else: 249 # Unstructured output with COT 250 # Two calls to separate the thinking from the final response 251 return get_chat_formatter( 252 strategy=ChatStrategy.two_message_cot, 253 system_message=system_message, 254 user_input=input, 255 thinking_instructions=cot_prompt, 256 ) 257 258 # create a run and task output 259 def generate_run( 260 self, 261 input: Dict | str, 262 input_source: DataSource | None, 263 run_output: RunOutput, 264 usage: Usage | None = None, 265 trace: list[ChatCompletionMessageParam] | None = None, 266 ) -> TaskRun: 267 # Convert input and output to JSON strings if they are dictionaries 268 input_str = ( 269 json.dumps(input, ensure_ascii=False) if isinstance(input, dict) else input 270 ) 271 output_str = ( 272 json.dumps(run_output.output, ensure_ascii=False) 273 if isinstance(run_output.output, dict) 274 else run_output.output 275 ) 276 277 # If no input source is provided, use the human data source 278 if input_source is None: 279 input_source = DataSource( 280 type=DataSourceType.human, 281 properties={"created_by": Config.shared().user_id}, 282 ) 283 284 new_task_run = TaskRun( 285 parent=self.task, 286 input=input_str, 287 input_source=input_source, 288 output=TaskOutput( 289 output=output_str, 290 # Synthetic since an adapter, not a human, is creating this 291 source=DataSource( 292 type=DataSourceType.synthetic, 293 properties=self._properties_for_task_output(), 294 run_config=self.run_config, 295 ), 296 ), 297 intermediate_outputs=run_output.intermediate_outputs, 298 tags=self.base_adapter_config.default_tags or [], 299 usage=usage, 300 trace=trace, 301 ) 302 303 return new_task_run 304 305 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 306 props = {} 307 308 props["adapter_name"] = self.adapter_name() 309 310 # Legacy properties where we save the run_config details into custom properties. 311 # These are now also be saved in the run_config field. 312 props["model_name"] = self.run_config.model_name 313 props["model_provider"] = self.run_config.model_provider_name 314 props["prompt_id"] = self.run_config.prompt_id 315 props["structured_output_mode"] = self.run_config.structured_output_mode 316 props["temperature"] = self.run_config.temperature 317 props["top_p"] = self.run_config.top_p 318 319 return props 320 321 def update_run_config_unknown_structured_output_mode(self) -> None: 322 structured_output_mode = self.run_config.structured_output_mode 323 324 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 325 # Look up our recommended mode from ml_model_list if we have one 326 if structured_output_mode == StructuredOutputMode.unknown: 327 new_run_config = self.run_config.model_copy(deep=True) 328 structured_output_mode = default_structured_output_mode_for_model_provider( 329 self.run_config.model_name, 330 self.run_config.model_provider_name, 331 ) 332 new_run_config.structured_output_mode = structured_output_mode 333 self.run_config = new_run_config 334 335 async def available_tools(self) -> list[KilnToolInterface]: 336 tool_config = self.run_config.tools_config 337 if tool_config is None or tool_config.tools is None: 338 return [] 339 340 project = self.task.parent_project() 341 if project is None: 342 raise ValueError("Task must have a parent project to resolve tools") 343 344 project_id = project.id 345 if project_id is None: 346 raise ValueError("Project must have an ID to resolve tools") 347 348 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 349 350 # Check each tool has a unique name 351 tool_names = [await tool.name() for tool in tools] 352 if len(tool_names) != len(set(tool_names)): 353 raise ValueError( 354 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 355 ) 356 357 return tools
@dataclass
class
AdapterConfig:
36@dataclass 37class AdapterConfig: 38 """ 39 An adapter config is config options that do NOT impact the output of the model. 40 41 For example: if it's saved, of if we request additional data like logprobs. 42 """ 43 44 allow_saving: bool = True 45 top_logprobs: int | None = None 46 default_tags: list[str] | None = None
An adapter config is config options that do NOT impact the output of the model.
For example: if it's saved, of if we request additional data like logprobs.
class
BaseAdapter:
49class BaseAdapter(metaclass=ABCMeta): 50 """Base class for AI model adapters that handle task execution. 51 52 This abstract class provides the foundation for implementing model-specific adapters 53 that can process tasks with structured or unstructured inputs/outputs. It handles 54 input/output validation, prompt building, and run tracking. 55 """ 56 57 def __init__( 58 self, 59 task: Task, 60 run_config: RunConfigProperties, 61 config: AdapterConfig | None = None, 62 ): 63 self.task = task 64 self.run_config = run_config 65 self.update_run_config_unknown_structured_output_mode() 66 self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task) 67 self._model_provider: KilnModelProvider | None = None 68 69 self.output_schema = task.output_json_schema 70 self.input_schema = task.input_json_schema 71 self.base_adapter_config = config or AdapterConfig() 72 73 def model_provider(self) -> KilnModelProvider: 74 """ 75 Lazy load the model provider for this adapter. 76 """ 77 if self._model_provider is not None: 78 return self._model_provider 79 if not self.run_config.model_name or not self.run_config.model_provider_name: 80 raise ValueError("model_name and model_provider_name must be provided") 81 self._model_provider = kiln_model_provider_from( 82 self.run_config.model_name, self.run_config.model_provider_name 83 ) 84 if not self._model_provider: 85 raise ValueError( 86 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 87 ) 88 return self._model_provider 89 90 async def invoke( 91 self, 92 input: Dict | str, 93 input_source: DataSource | None = None, 94 ) -> TaskRun: 95 run_output, _ = await self.invoke_returning_run_output(input, input_source) 96 return run_output 97 98 async def invoke_returning_run_output( 99 self, 100 input: Dict | str, 101 input_source: DataSource | None = None, 102 ) -> Tuple[TaskRun, RunOutput]: 103 # validate input 104 if self.input_schema is not None: 105 if not isinstance(input, dict): 106 raise ValueError(f"structured input is not a dict: {input}") 107 108 validate_schema_with_value_error( 109 input, 110 self.input_schema, 111 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 112 ) 113 114 # Format model input for model call (we save the original input in the task without formatting) 115 formatted_input = input 116 formatter_id = self.model_provider().formatter 117 if formatter_id is not None: 118 formatter = request_formatter_from_id(formatter_id) 119 formatted_input = formatter.format_input(input) 120 121 # Run 122 run_output, usage = await self._run(formatted_input) 123 124 # Parse 125 provider = self.model_provider() 126 parser = model_parser_from_id(provider.parser) 127 parsed_output = parser.parse_output(original_output=run_output) 128 129 # validate output 130 if self.output_schema is not None: 131 # Parse json to dict if we have structured output 132 if isinstance(parsed_output.output, str): 133 parsed_output.output = parse_json_string(parsed_output.output) 134 135 if not isinstance(parsed_output.output, dict): 136 raise RuntimeError( 137 f"structured response is not a dict: {parsed_output.output}" 138 ) 139 validate_schema_with_value_error( 140 parsed_output.output, 141 self.output_schema, 142 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 143 ) 144 else: 145 if not isinstance(parsed_output.output, str): 146 raise RuntimeError( 147 f"response is not a string for non-structured task: {parsed_output.output}" 148 ) 149 150 # Validate reasoning content is present and required 151 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 152 trace_has_toolcalls = parsed_output.trace is not None and any( 153 message.get("role", None) == "tool" for message in parsed_output.trace 154 ) 155 if ( 156 provider.reasoning_capable 157 and ( 158 not parsed_output.intermediate_outputs 159 or "reasoning" not in parsed_output.intermediate_outputs 160 ) 161 and not ( 162 provider.reasoning_optional_for_structured_output 163 and self.has_structured_output() 164 ) 165 and not (trace_has_toolcalls) 166 ): 167 raise RuntimeError( 168 "Reasoning is required for this model, but no reasoning was returned." 169 ) 170 171 # Generate the run and output 172 run = self.generate_run( 173 input, input_source, parsed_output, usage, run_output.trace 174 ) 175 176 # Save the run if configured to do so, and we have a path to save to 177 if ( 178 self.base_adapter_config.allow_saving 179 and Config.shared().autosave_runs 180 and self.task.path is not None 181 ): 182 run.save_to_file() 183 else: 184 # Clear the ID to indicate it's not persisted 185 run.id = None 186 187 return run, run_output 188 189 def has_structured_output(self) -> bool: 190 return self.output_schema is not None 191 192 @abstractmethod 193 def adapter_name(self) -> str: 194 pass 195 196 @abstractmethod 197 async def _run(self, input: Dict | str) -> Tuple[RunOutput, Usage | None]: 198 pass 199 200 def build_prompt(self) -> str: 201 # The prompt builder needs to know if we want to inject formatting instructions 202 structured_output_mode = self.run_config.structured_output_mode 203 add_json_instructions = self.has_structured_output() and ( 204 structured_output_mode == StructuredOutputMode.json_instructions 205 or structured_output_mode 206 == StructuredOutputMode.json_instruction_and_object 207 ) 208 209 return self.prompt_builder.build_prompt( 210 include_json_instructions=add_json_instructions 211 ) 212 213 def build_chat_formatter(self, input: Dict | str) -> ChatFormatter: 214 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 215 216 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 217 system_message = self.build_prompt() 218 219 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 220 if not cot_prompt: 221 return get_chat_formatter( 222 strategy=ChatStrategy.single_turn, 223 system_message=system_message, 224 user_input=input, 225 ) 226 227 # Some models like finetunes are trained with a specific chat strategy. Use that. 228 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 229 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 230 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 231 return get_chat_formatter( 232 strategy=tuned_chat_strategy, 233 system_message=system_message, 234 user_input=input, 235 thinking_instructions=cot_prompt, 236 ) 237 238 # Pick the best chat strategy for the model given it has a cot prompt. 239 reasoning_capable = self.model_provider().reasoning_capable 240 if reasoning_capable: 241 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 242 # A simple message with the COT prompt appended to the message list is sufficient 243 return get_chat_formatter( 244 strategy=ChatStrategy.single_turn_r1_thinking, 245 system_message=system_message, 246 user_input=input, 247 thinking_instructions=cot_prompt, 248 ) 249 else: 250 # Unstructured output with COT 251 # Two calls to separate the thinking from the final response 252 return get_chat_formatter( 253 strategy=ChatStrategy.two_message_cot, 254 system_message=system_message, 255 user_input=input, 256 thinking_instructions=cot_prompt, 257 ) 258 259 # create a run and task output 260 def generate_run( 261 self, 262 input: Dict | str, 263 input_source: DataSource | None, 264 run_output: RunOutput, 265 usage: Usage | None = None, 266 trace: list[ChatCompletionMessageParam] | None = None, 267 ) -> TaskRun: 268 # Convert input and output to JSON strings if they are dictionaries 269 input_str = ( 270 json.dumps(input, ensure_ascii=False) if isinstance(input, dict) else input 271 ) 272 output_str = ( 273 json.dumps(run_output.output, ensure_ascii=False) 274 if isinstance(run_output.output, dict) 275 else run_output.output 276 ) 277 278 # If no input source is provided, use the human data source 279 if input_source is None: 280 input_source = DataSource( 281 type=DataSourceType.human, 282 properties={"created_by": Config.shared().user_id}, 283 ) 284 285 new_task_run = TaskRun( 286 parent=self.task, 287 input=input_str, 288 input_source=input_source, 289 output=TaskOutput( 290 output=output_str, 291 # Synthetic since an adapter, not a human, is creating this 292 source=DataSource( 293 type=DataSourceType.synthetic, 294 properties=self._properties_for_task_output(), 295 run_config=self.run_config, 296 ), 297 ), 298 intermediate_outputs=run_output.intermediate_outputs, 299 tags=self.base_adapter_config.default_tags or [], 300 usage=usage, 301 trace=trace, 302 ) 303 304 return new_task_run 305 306 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 307 props = {} 308 309 props["adapter_name"] = self.adapter_name() 310 311 # Legacy properties where we save the run_config details into custom properties. 312 # These are now also be saved in the run_config field. 313 props["model_name"] = self.run_config.model_name 314 props["model_provider"] = self.run_config.model_provider_name 315 props["prompt_id"] = self.run_config.prompt_id 316 props["structured_output_mode"] = self.run_config.structured_output_mode 317 props["temperature"] = self.run_config.temperature 318 props["top_p"] = self.run_config.top_p 319 320 return props 321 322 def update_run_config_unknown_structured_output_mode(self) -> None: 323 structured_output_mode = self.run_config.structured_output_mode 324 325 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 326 # Look up our recommended mode from ml_model_list if we have one 327 if structured_output_mode == StructuredOutputMode.unknown: 328 new_run_config = self.run_config.model_copy(deep=True) 329 structured_output_mode = default_structured_output_mode_for_model_provider( 330 self.run_config.model_name, 331 self.run_config.model_provider_name, 332 ) 333 new_run_config.structured_output_mode = structured_output_mode 334 self.run_config = new_run_config 335 336 async def available_tools(self) -> list[KilnToolInterface]: 337 tool_config = self.run_config.tools_config 338 if tool_config is None or tool_config.tools is None: 339 return [] 340 341 project = self.task.parent_project() 342 if project is None: 343 raise ValueError("Task must have a parent project to resolve tools") 344 345 project_id = project.id 346 if project_id is None: 347 raise ValueError("Project must have an ID to resolve tools") 348 349 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 350 351 # Check each tool has a unique name 352 tool_names = [await tool.name() for tool in tools] 353 if len(tool_names) != len(set(tool_names)): 354 raise ValueError( 355 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 356 ) 357 358 return tools
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
73 def model_provider(self) -> KilnModelProvider: 74 """ 75 Lazy load the model provider for this adapter. 76 """ 77 if self._model_provider is not None: 78 return self._model_provider 79 if not self.run_config.model_name or not self.run_config.model_provider_name: 80 raise ValueError("model_name and model_provider_name must be provided") 81 self._model_provider = kiln_model_provider_from( 82 self.run_config.model_name, self.run_config.model_provider_name 83 ) 84 if not self._model_provider: 85 raise ValueError( 86 f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}" 87 ) 88 return self._model_provider
Lazy load the model provider for this adapter.
async def
invoke( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
async def
invoke_returning_run_output( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
98 async def invoke_returning_run_output( 99 self, 100 input: Dict | str, 101 input_source: DataSource | None = None, 102 ) -> Tuple[TaskRun, RunOutput]: 103 # validate input 104 if self.input_schema is not None: 105 if not isinstance(input, dict): 106 raise ValueError(f"structured input is not a dict: {input}") 107 108 validate_schema_with_value_error( 109 input, 110 self.input_schema, 111 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 112 ) 113 114 # Format model input for model call (we save the original input in the task without formatting) 115 formatted_input = input 116 formatter_id = self.model_provider().formatter 117 if formatter_id is not None: 118 formatter = request_formatter_from_id(formatter_id) 119 formatted_input = formatter.format_input(input) 120 121 # Run 122 run_output, usage = await self._run(formatted_input) 123 124 # Parse 125 provider = self.model_provider() 126 parser = model_parser_from_id(provider.parser) 127 parsed_output = parser.parse_output(original_output=run_output) 128 129 # validate output 130 if self.output_schema is not None: 131 # Parse json to dict if we have structured output 132 if isinstance(parsed_output.output, str): 133 parsed_output.output = parse_json_string(parsed_output.output) 134 135 if not isinstance(parsed_output.output, dict): 136 raise RuntimeError( 137 f"structured response is not a dict: {parsed_output.output}" 138 ) 139 validate_schema_with_value_error( 140 parsed_output.output, 141 self.output_schema, 142 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 143 ) 144 else: 145 if not isinstance(parsed_output.output, str): 146 raise RuntimeError( 147 f"response is not a string for non-structured task: {parsed_output.output}" 148 ) 149 150 # Validate reasoning content is present and required 151 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 152 trace_has_toolcalls = parsed_output.trace is not None and any( 153 message.get("role", None) == "tool" for message in parsed_output.trace 154 ) 155 if ( 156 provider.reasoning_capable 157 and ( 158 not parsed_output.intermediate_outputs 159 or "reasoning" not in parsed_output.intermediate_outputs 160 ) 161 and not ( 162 provider.reasoning_optional_for_structured_output 163 and self.has_structured_output() 164 ) 165 and not (trace_has_toolcalls) 166 ): 167 raise RuntimeError( 168 "Reasoning is required for this model, but no reasoning was returned." 169 ) 170 171 # Generate the run and output 172 run = self.generate_run( 173 input, input_source, parsed_output, usage, run_output.trace 174 ) 175 176 # Save the run if configured to do so, and we have a path to save to 177 if ( 178 self.base_adapter_config.allow_saving 179 and Config.shared().autosave_runs 180 and self.task.path is not None 181 ): 182 run.save_to_file() 183 else: 184 # Clear the ID to indicate it's not persisted 185 run.id = None 186 187 return run, run_output
def
build_prompt(self) -> str:
200 def build_prompt(self) -> str: 201 # The prompt builder needs to know if we want to inject formatting instructions 202 structured_output_mode = self.run_config.structured_output_mode 203 add_json_instructions = self.has_structured_output() and ( 204 structured_output_mode == StructuredOutputMode.json_instructions 205 or structured_output_mode 206 == StructuredOutputMode.json_instruction_and_object 207 ) 208 209 return self.prompt_builder.build_prompt( 210 include_json_instructions=add_json_instructions 211 )
213 def build_chat_formatter(self, input: Dict | str) -> ChatFormatter: 214 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 215 216 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 217 system_message = self.build_prompt() 218 219 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 220 if not cot_prompt: 221 return get_chat_formatter( 222 strategy=ChatStrategy.single_turn, 223 system_message=system_message, 224 user_input=input, 225 ) 226 227 # Some models like finetunes are trained with a specific chat strategy. Use that. 228 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 229 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 230 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 231 return get_chat_formatter( 232 strategy=tuned_chat_strategy, 233 system_message=system_message, 234 user_input=input, 235 thinking_instructions=cot_prompt, 236 ) 237 238 # Pick the best chat strategy for the model given it has a cot prompt. 239 reasoning_capable = self.model_provider().reasoning_capable 240 if reasoning_capable: 241 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 242 # A simple message with the COT prompt appended to the message list is sufficient 243 return get_chat_formatter( 244 strategy=ChatStrategy.single_turn_r1_thinking, 245 system_message=system_message, 246 user_input=input, 247 thinking_instructions=cot_prompt, 248 ) 249 else: 250 # Unstructured output with COT 251 # Two calls to separate the thinking from the final response 252 return get_chat_formatter( 253 strategy=ChatStrategy.two_message_cot, 254 system_message=system_message, 255 user_input=input, 256 thinking_instructions=cot_prompt, 257 )
def
generate_run( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
260 def generate_run( 261 self, 262 input: Dict | str, 263 input_source: DataSource | None, 264 run_output: RunOutput, 265 usage: Usage | None = None, 266 trace: list[ChatCompletionMessageParam] | None = None, 267 ) -> TaskRun: 268 # Convert input and output to JSON strings if they are dictionaries 269 input_str = ( 270 json.dumps(input, ensure_ascii=False) if isinstance(input, dict) else input 271 ) 272 output_str = ( 273 json.dumps(run_output.output, ensure_ascii=False) 274 if isinstance(run_output.output, dict) 275 else run_output.output 276 ) 277 278 # If no input source is provided, use the human data source 279 if input_source is None: 280 input_source = DataSource( 281 type=DataSourceType.human, 282 properties={"created_by": Config.shared().user_id}, 283 ) 284 285 new_task_run = TaskRun( 286 parent=self.task, 287 input=input_str, 288 input_source=input_source, 289 output=TaskOutput( 290 output=output_str, 291 # Synthetic since an adapter, not a human, is creating this 292 source=DataSource( 293 type=DataSourceType.synthetic, 294 properties=self._properties_for_task_output(), 295 run_config=self.run_config, 296 ), 297 ), 298 intermediate_outputs=run_output.intermediate_outputs, 299 tags=self.base_adapter_config.default_tags or [], 300 usage=usage, 301 trace=trace, 302 ) 303 304 return new_task_run
def
update_run_config_unknown_structured_output_mode(self) -> None:
322 def update_run_config_unknown_structured_output_mode(self) -> None: 323 structured_output_mode = self.run_config.structured_output_mode 324 325 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 326 # Look up our recommended mode from ml_model_list if we have one 327 if structured_output_mode == StructuredOutputMode.unknown: 328 new_run_config = self.run_config.model_copy(deep=True) 329 structured_output_mode = default_structured_output_mode_for_model_provider( 330 self.run_config.model_name, 331 self.run_config.model_provider_name, 332 ) 333 new_run_config.structured_output_mode = structured_output_mode 334 self.run_config = new_run_config
336 async def available_tools(self) -> list[KilnToolInterface]: 337 tool_config = self.run_config.tools_config 338 if tool_config is None or tool_config.tools is None: 339 return [] 340 341 project = self.task.parent_project() 342 if project is None: 343 raise ValueError("Task must have a parent project to resolve tools") 344 345 project_id = project.id 346 if project_id is None: 347 raise ValueError("Project must have an ID to resolve tools") 348 349 tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools] 350 351 # Check each tool has a unique name 352 tool_names = [await tool.name() for tool in tools] 353 if len(tool_names) != len(set(tool_names)): 354 raise ValueError( 355 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 356 ) 357 358 return tools