kiln_ai.adapters.model_adapters.base_adapter

  1import json
  2from abc import ABCMeta, abstractmethod
  3from dataclasses import dataclass
  4from typing import Dict, Tuple
  5
  6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter
  7from kiln_ai.adapters.ml_model_list import (
  8    KilnModelProvider,
  9    StructuredOutputMode,
 10    default_structured_output_mode_for_model_provider,
 11)
 12from kiln_ai.adapters.parsers.json_parser import parse_json_string
 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id
 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id
 15from kiln_ai.adapters.prompt_builders import prompt_builder_from_id
 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from
 17from kiln_ai.adapters.run_output import RunOutput
 18from kiln_ai.datamodel import (
 19    DataSource,
 20    DataSourceType,
 21    Task,
 22    TaskOutput,
 23    TaskRun,
 24    Usage,
 25)
 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy
 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error
 28from kiln_ai.datamodel.task import RunConfigProperties
 29from kiln_ai.tools import KilnToolInterface
 30from kiln_ai.tools.tool_registry import tool_from_id
 31from kiln_ai.utils.config import Config
 32from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
 33
 34
 35@dataclass
 36class AdapterConfig:
 37    """
 38    An adapter config is config options that do NOT impact the output of the model.
 39
 40    For example: if it's saved, of if we request additional data like logprobs.
 41    """
 42
 43    allow_saving: bool = True
 44    top_logprobs: int | None = None
 45    default_tags: list[str] | None = None
 46
 47
 48class BaseAdapter(metaclass=ABCMeta):
 49    """Base class for AI model adapters that handle task execution.
 50
 51    This abstract class provides the foundation for implementing model-specific adapters
 52    that can process tasks with structured or unstructured inputs/outputs. It handles
 53    input/output validation, prompt building, and run tracking.
 54    """
 55
 56    def __init__(
 57        self,
 58        task: Task,
 59        run_config: RunConfigProperties,
 60        config: AdapterConfig | None = None,
 61    ):
 62        self.task = task
 63        self.run_config = run_config
 64        self.update_run_config_unknown_structured_output_mode()
 65        self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task)
 66        self._model_provider: KilnModelProvider | None = None
 67
 68        self.output_schema = task.output_json_schema
 69        self.input_schema = task.input_json_schema
 70        self.base_adapter_config = config or AdapterConfig()
 71
 72    def model_provider(self) -> KilnModelProvider:
 73        """
 74        Lazy load the model provider for this adapter.
 75        """
 76        if self._model_provider is not None:
 77            return self._model_provider
 78        if not self.run_config.model_name or not self.run_config.model_provider_name:
 79            raise ValueError("model_name and model_provider_name must be provided")
 80        self._model_provider = kiln_model_provider_from(
 81            self.run_config.model_name, self.run_config.model_provider_name
 82        )
 83        if not self._model_provider:
 84            raise ValueError(
 85                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
 86            )
 87        return self._model_provider
 88
 89    async def invoke(
 90        self,
 91        input: Dict | str,
 92        input_source: DataSource | None = None,
 93    ) -> TaskRun:
 94        run_output, _ = await self.invoke_returning_run_output(input, input_source)
 95        return run_output
 96
 97    async def invoke_returning_run_output(
 98        self,
 99        input: Dict | str,
100        input_source: DataSource | None = None,
101    ) -> Tuple[TaskRun, RunOutput]:
102        # validate input
103        if self.input_schema is not None:
104            if not isinstance(input, dict):
105                raise ValueError(f"structured input is not a dict: {input}")
106
107            validate_schema_with_value_error(
108                input,
109                self.input_schema,
110                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
111            )
112
113        # Format model input for model call (we save the original input in the task without formatting)
114        formatted_input = input
115        formatter_id = self.model_provider().formatter
116        if formatter_id is not None:
117            formatter = request_formatter_from_id(formatter_id)
118            formatted_input = formatter.format_input(input)
119
120        # Run
121        run_output, usage = await self._run(formatted_input)
122
123        # Parse
124        provider = self.model_provider()
125        parser = model_parser_from_id(provider.parser)
126        parsed_output = parser.parse_output(original_output=run_output)
127
128        # validate output
129        if self.output_schema is not None:
130            # Parse json to dict if we have structured output
131            if isinstance(parsed_output.output, str):
132                parsed_output.output = parse_json_string(parsed_output.output)
133
134            if not isinstance(parsed_output.output, dict):
135                raise RuntimeError(
136                    f"structured response is not a dict: {parsed_output.output}"
137                )
138            validate_schema_with_value_error(
139                parsed_output.output,
140                self.output_schema,
141                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
142            )
143        else:
144            if not isinstance(parsed_output.output, str):
145                raise RuntimeError(
146                    f"response is not a string for non-structured task: {parsed_output.output}"
147                )
148
149        # Validate reasoning content is present and required
150        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
151        trace_has_toolcalls = parsed_output.trace is not None and any(
152            message.get("role", None) == "tool" for message in parsed_output.trace
153        )
154        if (
155            provider.reasoning_capable
156            and (
157                not parsed_output.intermediate_outputs
158                or "reasoning" not in parsed_output.intermediate_outputs
159            )
160            and not (
161                provider.reasoning_optional_for_structured_output
162                and self.has_structured_output()
163            )
164            and not (trace_has_toolcalls)
165        ):
166            raise RuntimeError(
167                "Reasoning is required for this model, but no reasoning was returned."
168            )
169
170        # Generate the run and output
171        run = self.generate_run(
172            input, input_source, parsed_output, usage, run_output.trace
173        )
174
175        # Save the run if configured to do so, and we have a path to save to
176        if (
177            self.base_adapter_config.allow_saving
178            and Config.shared().autosave_runs
179            and self.task.path is not None
180        ):
181            run.save_to_file()
182        else:
183            # Clear the ID to indicate it's not persisted
184            run.id = None
185
186        return run, run_output
187
188    def has_structured_output(self) -> bool:
189        return self.output_schema is not None
190
191    @abstractmethod
192    def adapter_name(self) -> str:
193        pass
194
195    @abstractmethod
196    async def _run(self, input: Dict | str) -> Tuple[RunOutput, Usage | None]:
197        pass
198
199    def build_prompt(self) -> str:
200        # The prompt builder needs to know if we want to inject formatting instructions
201        structured_output_mode = self.run_config.structured_output_mode
202        add_json_instructions = self.has_structured_output() and (
203            structured_output_mode == StructuredOutputMode.json_instructions
204            or structured_output_mode
205            == StructuredOutputMode.json_instruction_and_object
206        )
207
208        return self.prompt_builder.build_prompt(
209            include_json_instructions=add_json_instructions
210        )
211
212    def build_chat_formatter(self, input: Dict | str) -> ChatFormatter:
213        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
214
215        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
216        system_message = self.build_prompt()
217
218        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
219        if not cot_prompt:
220            return get_chat_formatter(
221                strategy=ChatStrategy.single_turn,
222                system_message=system_message,
223                user_input=input,
224            )
225
226        # Some models like finetunes are trained with a specific chat strategy. Use that.
227        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
228        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
229        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
230            return get_chat_formatter(
231                strategy=tuned_chat_strategy,
232                system_message=system_message,
233                user_input=input,
234                thinking_instructions=cot_prompt,
235            )
236
237        # Pick the best chat strategy for the model given it has a cot prompt.
238        reasoning_capable = self.model_provider().reasoning_capable
239        if reasoning_capable:
240            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
241            # A simple message with the COT prompt appended to the message list is sufficient
242            return get_chat_formatter(
243                strategy=ChatStrategy.single_turn_r1_thinking,
244                system_message=system_message,
245                user_input=input,
246                thinking_instructions=cot_prompt,
247            )
248        else:
249            # Unstructured output with COT
250            # Two calls to separate the thinking from the final response
251            return get_chat_formatter(
252                strategy=ChatStrategy.two_message_cot,
253                system_message=system_message,
254                user_input=input,
255                thinking_instructions=cot_prompt,
256            )
257
258    # create a run and task output
259    def generate_run(
260        self,
261        input: Dict | str,
262        input_source: DataSource | None,
263        run_output: RunOutput,
264        usage: Usage | None = None,
265        trace: list[ChatCompletionMessageParam] | None = None,
266    ) -> TaskRun:
267        # Convert input and output to JSON strings if they are dictionaries
268        input_str = (
269            json.dumps(input, ensure_ascii=False) if isinstance(input, dict) else input
270        )
271        output_str = (
272            json.dumps(run_output.output, ensure_ascii=False)
273            if isinstance(run_output.output, dict)
274            else run_output.output
275        )
276
277        # If no input source is provided, use the human data source
278        if input_source is None:
279            input_source = DataSource(
280                type=DataSourceType.human,
281                properties={"created_by": Config.shared().user_id},
282            )
283
284        new_task_run = TaskRun(
285            parent=self.task,
286            input=input_str,
287            input_source=input_source,
288            output=TaskOutput(
289                output=output_str,
290                # Synthetic since an adapter, not a human, is creating this
291                source=DataSource(
292                    type=DataSourceType.synthetic,
293                    properties=self._properties_for_task_output(),
294                    run_config=self.run_config,
295                ),
296            ),
297            intermediate_outputs=run_output.intermediate_outputs,
298            tags=self.base_adapter_config.default_tags or [],
299            usage=usage,
300            trace=trace,
301        )
302
303        return new_task_run
304
305    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
306        props = {}
307
308        props["adapter_name"] = self.adapter_name()
309
310        # Legacy properties where we save the run_config details into custom properties.
311        # These are now also be saved in the run_config field.
312        props["model_name"] = self.run_config.model_name
313        props["model_provider"] = self.run_config.model_provider_name
314        props["prompt_id"] = self.run_config.prompt_id
315        props["structured_output_mode"] = self.run_config.structured_output_mode
316        props["temperature"] = self.run_config.temperature
317        props["top_p"] = self.run_config.top_p
318
319        return props
320
321    def update_run_config_unknown_structured_output_mode(self) -> None:
322        structured_output_mode = self.run_config.structured_output_mode
323
324        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
325        # Look up our recommended mode from ml_model_list if we have one
326        if structured_output_mode == StructuredOutputMode.unknown:
327            new_run_config = self.run_config.model_copy(deep=True)
328            structured_output_mode = default_structured_output_mode_for_model_provider(
329                self.run_config.model_name,
330                self.run_config.model_provider_name,
331            )
332            new_run_config.structured_output_mode = structured_output_mode
333            self.run_config = new_run_config
334
335    async def available_tools(self) -> list[KilnToolInterface]:
336        tool_config = self.run_config.tools_config
337        if tool_config is None or tool_config.tools is None:
338            return []
339
340        project = self.task.parent_project()
341        if project is None:
342            raise ValueError("Task must have a parent project to resolve tools")
343
344        project_id = project.id
345        if project_id is None:
346            raise ValueError("Project must have an ID to resolve tools")
347
348        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
349
350        # Check each tool has a unique name
351        tool_names = [await tool.name() for tool in tools]
352        if len(tool_names) != len(set(tool_names)):
353            raise ValueError(
354                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
355            )
356
357        return tools
@dataclass
class AdapterConfig:
36@dataclass
37class AdapterConfig:
38    """
39    An adapter config is config options that do NOT impact the output of the model.
40
41    For example: if it's saved, of if we request additional data like logprobs.
42    """
43
44    allow_saving: bool = True
45    top_logprobs: int | None = None
46    default_tags: list[str] | None = None

An adapter config is config options that do NOT impact the output of the model.

For example: if it's saved, of if we request additional data like logprobs.

AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None)
allow_saving: bool = True
top_logprobs: int | None = None
default_tags: list[str] | None = None
class BaseAdapter:
 49class BaseAdapter(metaclass=ABCMeta):
 50    """Base class for AI model adapters that handle task execution.
 51
 52    This abstract class provides the foundation for implementing model-specific adapters
 53    that can process tasks with structured or unstructured inputs/outputs. It handles
 54    input/output validation, prompt building, and run tracking.
 55    """
 56
 57    def __init__(
 58        self,
 59        task: Task,
 60        run_config: RunConfigProperties,
 61        config: AdapterConfig | None = None,
 62    ):
 63        self.task = task
 64        self.run_config = run_config
 65        self.update_run_config_unknown_structured_output_mode()
 66        self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task)
 67        self._model_provider: KilnModelProvider | None = None
 68
 69        self.output_schema = task.output_json_schema
 70        self.input_schema = task.input_json_schema
 71        self.base_adapter_config = config or AdapterConfig()
 72
 73    def model_provider(self) -> KilnModelProvider:
 74        """
 75        Lazy load the model provider for this adapter.
 76        """
 77        if self._model_provider is not None:
 78            return self._model_provider
 79        if not self.run_config.model_name or not self.run_config.model_provider_name:
 80            raise ValueError("model_name and model_provider_name must be provided")
 81        self._model_provider = kiln_model_provider_from(
 82            self.run_config.model_name, self.run_config.model_provider_name
 83        )
 84        if not self._model_provider:
 85            raise ValueError(
 86                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
 87            )
 88        return self._model_provider
 89
 90    async def invoke(
 91        self,
 92        input: Dict | str,
 93        input_source: DataSource | None = None,
 94    ) -> TaskRun:
 95        run_output, _ = await self.invoke_returning_run_output(input, input_source)
 96        return run_output
 97
 98    async def invoke_returning_run_output(
 99        self,
100        input: Dict | str,
101        input_source: DataSource | None = None,
102    ) -> Tuple[TaskRun, RunOutput]:
103        # validate input
104        if self.input_schema is not None:
105            if not isinstance(input, dict):
106                raise ValueError(f"structured input is not a dict: {input}")
107
108            validate_schema_with_value_error(
109                input,
110                self.input_schema,
111                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
112            )
113
114        # Format model input for model call (we save the original input in the task without formatting)
115        formatted_input = input
116        formatter_id = self.model_provider().formatter
117        if formatter_id is not None:
118            formatter = request_formatter_from_id(formatter_id)
119            formatted_input = formatter.format_input(input)
120
121        # Run
122        run_output, usage = await self._run(formatted_input)
123
124        # Parse
125        provider = self.model_provider()
126        parser = model_parser_from_id(provider.parser)
127        parsed_output = parser.parse_output(original_output=run_output)
128
129        # validate output
130        if self.output_schema is not None:
131            # Parse json to dict if we have structured output
132            if isinstance(parsed_output.output, str):
133                parsed_output.output = parse_json_string(parsed_output.output)
134
135            if not isinstance(parsed_output.output, dict):
136                raise RuntimeError(
137                    f"structured response is not a dict: {parsed_output.output}"
138                )
139            validate_schema_with_value_error(
140                parsed_output.output,
141                self.output_schema,
142                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
143            )
144        else:
145            if not isinstance(parsed_output.output, str):
146                raise RuntimeError(
147                    f"response is not a string for non-structured task: {parsed_output.output}"
148                )
149
150        # Validate reasoning content is present and required
151        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
152        trace_has_toolcalls = parsed_output.trace is not None and any(
153            message.get("role", None) == "tool" for message in parsed_output.trace
154        )
155        if (
156            provider.reasoning_capable
157            and (
158                not parsed_output.intermediate_outputs
159                or "reasoning" not in parsed_output.intermediate_outputs
160            )
161            and not (
162                provider.reasoning_optional_for_structured_output
163                and self.has_structured_output()
164            )
165            and not (trace_has_toolcalls)
166        ):
167            raise RuntimeError(
168                "Reasoning is required for this model, but no reasoning was returned."
169            )
170
171        # Generate the run and output
172        run = self.generate_run(
173            input, input_source, parsed_output, usage, run_output.trace
174        )
175
176        # Save the run if configured to do so, and we have a path to save to
177        if (
178            self.base_adapter_config.allow_saving
179            and Config.shared().autosave_runs
180            and self.task.path is not None
181        ):
182            run.save_to_file()
183        else:
184            # Clear the ID to indicate it's not persisted
185            run.id = None
186
187        return run, run_output
188
189    def has_structured_output(self) -> bool:
190        return self.output_schema is not None
191
192    @abstractmethod
193    def adapter_name(self) -> str:
194        pass
195
196    @abstractmethod
197    async def _run(self, input: Dict | str) -> Tuple[RunOutput, Usage | None]:
198        pass
199
200    def build_prompt(self) -> str:
201        # The prompt builder needs to know if we want to inject formatting instructions
202        structured_output_mode = self.run_config.structured_output_mode
203        add_json_instructions = self.has_structured_output() and (
204            structured_output_mode == StructuredOutputMode.json_instructions
205            or structured_output_mode
206            == StructuredOutputMode.json_instruction_and_object
207        )
208
209        return self.prompt_builder.build_prompt(
210            include_json_instructions=add_json_instructions
211        )
212
213    def build_chat_formatter(self, input: Dict | str) -> ChatFormatter:
214        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
215
216        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
217        system_message = self.build_prompt()
218
219        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
220        if not cot_prompt:
221            return get_chat_formatter(
222                strategy=ChatStrategy.single_turn,
223                system_message=system_message,
224                user_input=input,
225            )
226
227        # Some models like finetunes are trained with a specific chat strategy. Use that.
228        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
229        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
230        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
231            return get_chat_formatter(
232                strategy=tuned_chat_strategy,
233                system_message=system_message,
234                user_input=input,
235                thinking_instructions=cot_prompt,
236            )
237
238        # Pick the best chat strategy for the model given it has a cot prompt.
239        reasoning_capable = self.model_provider().reasoning_capable
240        if reasoning_capable:
241            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
242            # A simple message with the COT prompt appended to the message list is sufficient
243            return get_chat_formatter(
244                strategy=ChatStrategy.single_turn_r1_thinking,
245                system_message=system_message,
246                user_input=input,
247                thinking_instructions=cot_prompt,
248            )
249        else:
250            # Unstructured output with COT
251            # Two calls to separate the thinking from the final response
252            return get_chat_formatter(
253                strategy=ChatStrategy.two_message_cot,
254                system_message=system_message,
255                user_input=input,
256                thinking_instructions=cot_prompt,
257            )
258
259    # create a run and task output
260    def generate_run(
261        self,
262        input: Dict | str,
263        input_source: DataSource | None,
264        run_output: RunOutput,
265        usage: Usage | None = None,
266        trace: list[ChatCompletionMessageParam] | None = None,
267    ) -> TaskRun:
268        # Convert input and output to JSON strings if they are dictionaries
269        input_str = (
270            json.dumps(input, ensure_ascii=False) if isinstance(input, dict) else input
271        )
272        output_str = (
273            json.dumps(run_output.output, ensure_ascii=False)
274            if isinstance(run_output.output, dict)
275            else run_output.output
276        )
277
278        # If no input source is provided, use the human data source
279        if input_source is None:
280            input_source = DataSource(
281                type=DataSourceType.human,
282                properties={"created_by": Config.shared().user_id},
283            )
284
285        new_task_run = TaskRun(
286            parent=self.task,
287            input=input_str,
288            input_source=input_source,
289            output=TaskOutput(
290                output=output_str,
291                # Synthetic since an adapter, not a human, is creating this
292                source=DataSource(
293                    type=DataSourceType.synthetic,
294                    properties=self._properties_for_task_output(),
295                    run_config=self.run_config,
296                ),
297            ),
298            intermediate_outputs=run_output.intermediate_outputs,
299            tags=self.base_adapter_config.default_tags or [],
300            usage=usage,
301            trace=trace,
302        )
303
304        return new_task_run
305
306    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
307        props = {}
308
309        props["adapter_name"] = self.adapter_name()
310
311        # Legacy properties where we save the run_config details into custom properties.
312        # These are now also be saved in the run_config field.
313        props["model_name"] = self.run_config.model_name
314        props["model_provider"] = self.run_config.model_provider_name
315        props["prompt_id"] = self.run_config.prompt_id
316        props["structured_output_mode"] = self.run_config.structured_output_mode
317        props["temperature"] = self.run_config.temperature
318        props["top_p"] = self.run_config.top_p
319
320        return props
321
322    def update_run_config_unknown_structured_output_mode(self) -> None:
323        structured_output_mode = self.run_config.structured_output_mode
324
325        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
326        # Look up our recommended mode from ml_model_list if we have one
327        if structured_output_mode == StructuredOutputMode.unknown:
328            new_run_config = self.run_config.model_copy(deep=True)
329            structured_output_mode = default_structured_output_mode_for_model_provider(
330                self.run_config.model_name,
331                self.run_config.model_provider_name,
332            )
333            new_run_config.structured_output_mode = structured_output_mode
334            self.run_config = new_run_config
335
336    async def available_tools(self) -> list[KilnToolInterface]:
337        tool_config = self.run_config.tools_config
338        if tool_config is None or tool_config.tools is None:
339            return []
340
341        project = self.task.parent_project()
342        if project is None:
343            raise ValueError("Task must have a parent project to resolve tools")
344
345        project_id = project.id
346        if project_id is None:
347            raise ValueError("Project must have an ID to resolve tools")
348
349        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
350
351        # Check each tool has a unique name
352        tool_names = [await tool.name() for tool in tools]
353        if len(tool_names) != len(set(tool_names)):
354            raise ValueError(
355                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
356            )
357
358        return tools

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

task
run_config
prompt_builder
output_schema
input_schema
base_adapter_config
def model_provider(self) -> kiln_ai.adapters.ml_model_list.KilnModelProvider:
73    def model_provider(self) -> KilnModelProvider:
74        """
75        Lazy load the model provider for this adapter.
76        """
77        if self._model_provider is not None:
78            return self._model_provider
79        if not self.run_config.model_name or not self.run_config.model_provider_name:
80            raise ValueError("model_name and model_provider_name must be provided")
81        self._model_provider = kiln_model_provider_from(
82            self.run_config.model_name, self.run_config.model_provider_name
83        )
84        if not self._model_provider:
85            raise ValueError(
86                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
87            )
88        return self._model_provider

Lazy load the model provider for this adapter.

async def invoke( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
90    async def invoke(
91        self,
92        input: Dict | str,
93        input_source: DataSource | None = None,
94    ) -> TaskRun:
95        run_output, _ = await self.invoke_returning_run_output(input, input_source)
96        return run_output
async def invoke_returning_run_output( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
 98    async def invoke_returning_run_output(
 99        self,
100        input: Dict | str,
101        input_source: DataSource | None = None,
102    ) -> Tuple[TaskRun, RunOutput]:
103        # validate input
104        if self.input_schema is not None:
105            if not isinstance(input, dict):
106                raise ValueError(f"structured input is not a dict: {input}")
107
108            validate_schema_with_value_error(
109                input,
110                self.input_schema,
111                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
112            )
113
114        # Format model input for model call (we save the original input in the task without formatting)
115        formatted_input = input
116        formatter_id = self.model_provider().formatter
117        if formatter_id is not None:
118            formatter = request_formatter_from_id(formatter_id)
119            formatted_input = formatter.format_input(input)
120
121        # Run
122        run_output, usage = await self._run(formatted_input)
123
124        # Parse
125        provider = self.model_provider()
126        parser = model_parser_from_id(provider.parser)
127        parsed_output = parser.parse_output(original_output=run_output)
128
129        # validate output
130        if self.output_schema is not None:
131            # Parse json to dict if we have structured output
132            if isinstance(parsed_output.output, str):
133                parsed_output.output = parse_json_string(parsed_output.output)
134
135            if not isinstance(parsed_output.output, dict):
136                raise RuntimeError(
137                    f"structured response is not a dict: {parsed_output.output}"
138                )
139            validate_schema_with_value_error(
140                parsed_output.output,
141                self.output_schema,
142                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
143            )
144        else:
145            if not isinstance(parsed_output.output, str):
146                raise RuntimeError(
147                    f"response is not a string for non-structured task: {parsed_output.output}"
148                )
149
150        # Validate reasoning content is present and required
151        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
152        trace_has_toolcalls = parsed_output.trace is not None and any(
153            message.get("role", None) == "tool" for message in parsed_output.trace
154        )
155        if (
156            provider.reasoning_capable
157            and (
158                not parsed_output.intermediate_outputs
159                or "reasoning" not in parsed_output.intermediate_outputs
160            )
161            and not (
162                provider.reasoning_optional_for_structured_output
163                and self.has_structured_output()
164            )
165            and not (trace_has_toolcalls)
166        ):
167            raise RuntimeError(
168                "Reasoning is required for this model, but no reasoning was returned."
169            )
170
171        # Generate the run and output
172        run = self.generate_run(
173            input, input_source, parsed_output, usage, run_output.trace
174        )
175
176        # Save the run if configured to do so, and we have a path to save to
177        if (
178            self.base_adapter_config.allow_saving
179            and Config.shared().autosave_runs
180            and self.task.path is not None
181        ):
182            run.save_to_file()
183        else:
184            # Clear the ID to indicate it's not persisted
185            run.id = None
186
187        return run, run_output
def has_structured_output(self) -> bool:
189    def has_structured_output(self) -> bool:
190        return self.output_schema is not None
@abstractmethod
def adapter_name(self) -> str:
192    @abstractmethod
193    def adapter_name(self) -> str:
194        pass
def build_prompt(self) -> str:
200    def build_prompt(self) -> str:
201        # The prompt builder needs to know if we want to inject formatting instructions
202        structured_output_mode = self.run_config.structured_output_mode
203        add_json_instructions = self.has_structured_output() and (
204            structured_output_mode == StructuredOutputMode.json_instructions
205            or structured_output_mode
206            == StructuredOutputMode.json_instruction_and_object
207        )
208
209        return self.prompt_builder.build_prompt(
210            include_json_instructions=add_json_instructions
211        )
def build_chat_formatter( self, input: Union[Dict, str]) -> kiln_ai.adapters.chat.ChatFormatter:
213    def build_chat_formatter(self, input: Dict | str) -> ChatFormatter:
214        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
215
216        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
217        system_message = self.build_prompt()
218
219        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
220        if not cot_prompt:
221            return get_chat_formatter(
222                strategy=ChatStrategy.single_turn,
223                system_message=system_message,
224                user_input=input,
225            )
226
227        # Some models like finetunes are trained with a specific chat strategy. Use that.
228        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
229        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
230        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
231            return get_chat_formatter(
232                strategy=tuned_chat_strategy,
233                system_message=system_message,
234                user_input=input,
235                thinking_instructions=cot_prompt,
236            )
237
238        # Pick the best chat strategy for the model given it has a cot prompt.
239        reasoning_capable = self.model_provider().reasoning_capable
240        if reasoning_capable:
241            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
242            # A simple message with the COT prompt appended to the message list is sufficient
243            return get_chat_formatter(
244                strategy=ChatStrategy.single_turn_r1_thinking,
245                system_message=system_message,
246                user_input=input,
247                thinking_instructions=cot_prompt,
248            )
249        else:
250            # Unstructured output with COT
251            # Two calls to separate the thinking from the final response
252            return get_chat_formatter(
253                strategy=ChatStrategy.two_message_cot,
254                system_message=system_message,
255                user_input=input,
256                thinking_instructions=cot_prompt,
257            )
def generate_run( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, openai.types.chat.chat_completion_tool_message_param.ChatCompletionToolMessageParam, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
260    def generate_run(
261        self,
262        input: Dict | str,
263        input_source: DataSource | None,
264        run_output: RunOutput,
265        usage: Usage | None = None,
266        trace: list[ChatCompletionMessageParam] | None = None,
267    ) -> TaskRun:
268        # Convert input and output to JSON strings if they are dictionaries
269        input_str = (
270            json.dumps(input, ensure_ascii=False) if isinstance(input, dict) else input
271        )
272        output_str = (
273            json.dumps(run_output.output, ensure_ascii=False)
274            if isinstance(run_output.output, dict)
275            else run_output.output
276        )
277
278        # If no input source is provided, use the human data source
279        if input_source is None:
280            input_source = DataSource(
281                type=DataSourceType.human,
282                properties={"created_by": Config.shared().user_id},
283            )
284
285        new_task_run = TaskRun(
286            parent=self.task,
287            input=input_str,
288            input_source=input_source,
289            output=TaskOutput(
290                output=output_str,
291                # Synthetic since an adapter, not a human, is creating this
292                source=DataSource(
293                    type=DataSourceType.synthetic,
294                    properties=self._properties_for_task_output(),
295                    run_config=self.run_config,
296                ),
297            ),
298            intermediate_outputs=run_output.intermediate_outputs,
299            tags=self.base_adapter_config.default_tags or [],
300            usage=usage,
301            trace=trace,
302        )
303
304        return new_task_run
def update_run_config_unknown_structured_output_mode(self) -> None:
322    def update_run_config_unknown_structured_output_mode(self) -> None:
323        structured_output_mode = self.run_config.structured_output_mode
324
325        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
326        # Look up our recommended mode from ml_model_list if we have one
327        if structured_output_mode == StructuredOutputMode.unknown:
328            new_run_config = self.run_config.model_copy(deep=True)
329            structured_output_mode = default_structured_output_mode_for_model_provider(
330                self.run_config.model_name,
331                self.run_config.model_provider_name,
332            )
333            new_run_config.structured_output_mode = structured_output_mode
334            self.run_config = new_run_config
async def available_tools(self) -> list[kiln_ai.tools.KilnToolInterface]:
336    async def available_tools(self) -> list[KilnToolInterface]:
337        tool_config = self.run_config.tools_config
338        if tool_config is None or tool_config.tools is None:
339            return []
340
341        project = self.task.parent_project()
342        if project is None:
343            raise ValueError("Task must have a parent project to resolve tools")
344
345        project_id = project.id
346        if project_id is None:
347            raise ValueError("Project must have an ID to resolve tools")
348
349        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
350
351        # Check each tool has a unique name
352        tool_names = [await tool.name() for tool in tools]
353        if len(tool_names) != len(set(tool_names)):
354            raise ValueError(
355                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
356            )
357
358        return tools