kiln_ai.adapters.model_adapters.base_adapter

  1import json
  2from abc import ABCMeta, abstractmethod
  3from dataclasses import dataclass
  4from typing import Dict, Tuple
  5
  6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter
  7from kiln_ai.adapters.ml_model_list import (
  8    KilnModelProvider,
  9    StructuredOutputMode,
 10    default_structured_output_mode_for_model_provider,
 11)
 12from kiln_ai.adapters.parsers.json_parser import parse_json_string
 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id
 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id
 15from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id
 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from
 17from kiln_ai.adapters.run_output import RunOutput
 18from kiln_ai.datamodel import (
 19    DataSource,
 20    DataSourceType,
 21    Task,
 22    TaskOutput,
 23    TaskRun,
 24    Usage,
 25)
 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error
 28from kiln_ai.datamodel.task import RunConfigProperties
 29from kiln_ai.tools import KilnToolInterface
 30from kiln_ai.tools.tool_registry import tool_from_id
 31from kiln_ai.utils.config import Config
 32from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
 33
 34
 35@dataclass
 36class AdapterConfig:
 37    """
 38    An adapter config is config options that do NOT impact the output of the model.
 39
 40    For example: if it's saved, of if we request additional data like logprobs.
 41    """
 42
 43    allow_saving: bool = True
 44    top_logprobs: int | None = None
 45    default_tags: list[str] | None = None
 46
 47    """
 48    A custom prompt builder can be injected to override the system prompt building process.
 49    If not provided, the prompt builder will be created from the run_config.prompt_id which
 50    may load additional files from disk.
 51    """
 52    prompt_builder: BasePromptBuilder | None = None
 53
 54
 55class BaseAdapter(metaclass=ABCMeta):
 56    """Base class for AI model adapters that handle task execution.
 57
 58    This abstract class provides the foundation for implementing model-specific adapters
 59    that can process tasks with structured or unstructured inputs/outputs. It handles
 60    input/output validation, prompt building, and run tracking.
 61
 62    Prompt building is handled internally by the adapter, which uses a prompt builder
 63    based on the run config. To override the prompt building behavior, pass a custom prompt
 64    builder to the adapter config.
 65    """
 66
 67    def __init__(
 68        self,
 69        task: Task,
 70        run_config: RunConfigProperties,
 71        config: AdapterConfig | None = None,
 72    ):
 73        self.task = task
 74        self.run_config: RunConfigProperties = run_config
 75        self.update_run_config_unknown_structured_output_mode()
 76        self.base_adapter_config = config or AdapterConfig()
 77
 78        self.prompt_builder = (
 79            self.base_adapter_config.prompt_builder
 80            or prompt_builder_from_id(run_config.prompt_id, task)
 81        )
 82        self._model_provider: KilnModelProvider | None = None
 83
 84        self.output_schema = task.output_json_schema
 85        self.input_schema = task.input_json_schema
 86
 87    def model_provider(self) -> KilnModelProvider:
 88        """
 89        Lazy load the model provider for this adapter.
 90        """
 91        if self._model_provider is not None:
 92            return self._model_provider
 93        if not self.run_config.model_name or not self.run_config.model_provider_name:
 94            raise ValueError("model_name and model_provider_name must be provided")
 95        self._model_provider = kiln_model_provider_from(
 96            self.run_config.model_name, self.run_config.model_provider_name
 97        )
 98        if not self._model_provider:
 99            raise ValueError(
100                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
101            )
102        return self._model_provider
103
104    async def invoke(
105        self,
106        input: InputType,
107        input_source: DataSource | None = None,
108    ) -> TaskRun:
109        run_output, _ = await self.invoke_returning_run_output(input, input_source)
110        return run_output
111
112    async def invoke_returning_run_output(
113        self,
114        input: InputType,
115        input_source: DataSource | None = None,
116    ) -> Tuple[TaskRun, RunOutput]:
117        # validate input, allowing arrays
118        if self.input_schema is not None:
119            validate_schema_with_value_error(
120                input,
121                self.input_schema,
122                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
123                require_object=False,
124            )
125
126        # Format model input for model call (we save the original input in the task without formatting)
127        formatted_input = input
128        formatter_id = self.model_provider().formatter
129        if formatter_id is not None:
130            formatter = request_formatter_from_id(formatter_id)
131            formatted_input = formatter.format_input(input)
132
133        # Run
134        run_output, usage = await self._run(formatted_input)
135
136        # Parse
137        provider = self.model_provider()
138        parser = model_parser_from_id(provider.parser)
139        parsed_output = parser.parse_output(original_output=run_output)
140
141        # validate output
142        if self.output_schema is not None:
143            # Parse json to dict if we have structured output
144            if isinstance(parsed_output.output, str):
145                parsed_output.output = parse_json_string(parsed_output.output)
146
147            if not isinstance(parsed_output.output, dict):
148                raise RuntimeError(
149                    f"structured response is not a dict: {parsed_output.output}"
150                )
151            validate_schema_with_value_error(
152                parsed_output.output,
153                self.output_schema,
154                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
155            )
156        else:
157            if not isinstance(parsed_output.output, str):
158                raise RuntimeError(
159                    f"response is not a string for non-structured task: {parsed_output.output}"
160                )
161
162        # Validate reasoning content is present and required
163        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
164        trace_has_toolcalls = parsed_output.trace is not None and any(
165            message.get("role", None) == "tool" for message in parsed_output.trace
166        )
167        if (
168            provider.reasoning_capable
169            and (
170                not parsed_output.intermediate_outputs
171                or "reasoning" not in parsed_output.intermediate_outputs
172            )
173            and not (
174                provider.reasoning_optional_for_structured_output
175                and self.has_structured_output()
176            )
177            and not (trace_has_toolcalls)
178        ):
179            raise RuntimeError(
180                "Reasoning is required for this model, but no reasoning was returned."
181            )
182
183        # Generate the run and output
184        run = self.generate_run(
185            input, input_source, parsed_output, usage, run_output.trace
186        )
187
188        # Save the run if configured to do so, and we have a path to save to
189        if (
190            self.base_adapter_config.allow_saving
191            and Config.shared().autosave_runs
192            and self.task.path is not None
193        ):
194            run.save_to_file()
195        else:
196            # Clear the ID to indicate it's not persisted
197            run.id = None
198
199        return run, run_output
200
201    def has_structured_output(self) -> bool:
202        return self.output_schema is not None
203
204    @abstractmethod
205    def adapter_name(self) -> str:
206        pass
207
208    @abstractmethod
209    async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]:
210        pass
211
212    def build_prompt(self) -> str:
213        # The prompt builder needs to know if we want to inject formatting instructions
214        structured_output_mode = self.run_config.structured_output_mode
215        add_json_instructions = self.has_structured_output() and (
216            structured_output_mode == StructuredOutputMode.json_instructions
217            or structured_output_mode
218            == StructuredOutputMode.json_instruction_and_object
219        )
220
221        return self.prompt_builder.build_prompt(
222            include_json_instructions=add_json_instructions
223        )
224
225    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
226        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
227
228        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
229        system_message = self.build_prompt()
230
231        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
232        if not cot_prompt:
233            return get_chat_formatter(
234                strategy=ChatStrategy.single_turn,
235                system_message=system_message,
236                user_input=input,
237            )
238
239        # Some models like finetunes are trained with a specific chat strategy. Use that.
240        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
241        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
242        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
243            return get_chat_formatter(
244                strategy=tuned_chat_strategy,
245                system_message=system_message,
246                user_input=input,
247                thinking_instructions=cot_prompt,
248            )
249
250        # Pick the best chat strategy for the model given it has a cot prompt.
251        reasoning_capable = self.model_provider().reasoning_capable
252        if reasoning_capable:
253            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
254            # A simple message with the COT prompt appended to the message list is sufficient
255            return get_chat_formatter(
256                strategy=ChatStrategy.single_turn_r1_thinking,
257                system_message=system_message,
258                user_input=input,
259                thinking_instructions=cot_prompt,
260            )
261        else:
262            # Unstructured output with COT
263            # Two calls to separate the thinking from the final response
264            return get_chat_formatter(
265                strategy=ChatStrategy.two_message_cot,
266                system_message=system_message,
267                user_input=input,
268                thinking_instructions=cot_prompt,
269            )
270
271    # create a run and task output
272    def generate_run(
273        self,
274        input: InputType,
275        input_source: DataSource | None,
276        run_output: RunOutput,
277        usage: Usage | None = None,
278        trace: list[ChatCompletionMessageParam] | None = None,
279    ) -> TaskRun:
280        # Convert input and output to JSON strings if they aren't strings
281        input_str = (
282            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
283        )
284        output_str = (
285            json.dumps(run_output.output, ensure_ascii=False)
286            if isinstance(run_output.output, dict)
287            else run_output.output
288        )
289
290        # If no input source is provided, use the human data source
291        if input_source is None:
292            input_source = DataSource(
293                type=DataSourceType.human,
294                properties={"created_by": Config.shared().user_id},
295            )
296
297        new_task_run = TaskRun(
298            parent=self.task,
299            input=input_str,
300            input_source=input_source,
301            output=TaskOutput(
302                output=output_str,
303                # Synthetic since an adapter, not a human, is creating this
304                source=DataSource(
305                    type=DataSourceType.synthetic,
306                    properties=self._properties_for_task_output(),
307                    run_config=self.run_config,
308                ),
309            ),
310            intermediate_outputs=run_output.intermediate_outputs,
311            tags=self.base_adapter_config.default_tags or [],
312            usage=usage,
313            trace=trace,
314        )
315
316        return new_task_run
317
318    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
319        props = {}
320
321        props["adapter_name"] = self.adapter_name()
322
323        # Legacy properties where we save the run_config details into custom properties.
324        # These are now also be saved in the run_config field.
325        props["model_name"] = self.run_config.model_name
326        props["model_provider"] = self.run_config.model_provider_name
327        props["prompt_id"] = self.run_config.prompt_id
328        props["structured_output_mode"] = self.run_config.structured_output_mode
329        props["temperature"] = self.run_config.temperature
330        props["top_p"] = self.run_config.top_p
331
332        return props
333
334    def update_run_config_unknown_structured_output_mode(self) -> None:
335        structured_output_mode = self.run_config.structured_output_mode
336
337        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
338        # Look up our recommended mode from ml_model_list if we have one
339        if structured_output_mode == StructuredOutputMode.unknown:
340            new_run_config = self.run_config.model_copy(deep=True)
341            structured_output_mode = default_structured_output_mode_for_model_provider(
342                self.run_config.model_name,
343                self.run_config.model_provider_name,
344            )
345            new_run_config.structured_output_mode = structured_output_mode
346            self.run_config = new_run_config
347
348    async def available_tools(self) -> list[KilnToolInterface]:
349        tool_config = self.run_config.tools_config
350        if tool_config is None or tool_config.tools is None:
351            return []
352
353        project = self.task.parent_project()
354        if project is None:
355            raise ValueError("Task must have a parent project to resolve tools")
356
357        project_id = project.id
358        if project_id is None:
359            raise ValueError("Project must have an ID to resolve tools")
360
361        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
362
363        # Check each tool has a unique name
364        tool_names = [await tool.name() for tool in tools]
365        if len(tool_names) != len(set(tool_names)):
366            raise ValueError(
367                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
368            )
369
370        return tools
@dataclass
class AdapterConfig:
36@dataclass
37class AdapterConfig:
38    """
39    An adapter config is config options that do NOT impact the output of the model.
40
41    For example: if it's saved, of if we request additional data like logprobs.
42    """
43
44    allow_saving: bool = True
45    top_logprobs: int | None = None
46    default_tags: list[str] | None = None
47
48    """
49    A custom prompt builder can be injected to override the system prompt building process.
50    If not provided, the prompt builder will be created from the run_config.prompt_id which
51    may load additional files from disk.
52    """
53    prompt_builder: BasePromptBuilder | None = None

An adapter config is config options that do NOT impact the output of the model.

For example: if it's saved, of if we request additional data like logprobs.

AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None, prompt_builder: kiln_ai.adapters.prompt_builders.BasePromptBuilder | None = None)
allow_saving: bool = True
top_logprobs: int | None = None
default_tags: list[str] | None = None

A custom prompt builder can be injected to override the system prompt building process. If not provided, the prompt builder will be created from the run_config.prompt_id which may load additional files from disk.

class BaseAdapter:
 56class BaseAdapter(metaclass=ABCMeta):
 57    """Base class for AI model adapters that handle task execution.
 58
 59    This abstract class provides the foundation for implementing model-specific adapters
 60    that can process tasks with structured or unstructured inputs/outputs. It handles
 61    input/output validation, prompt building, and run tracking.
 62
 63    Prompt building is handled internally by the adapter, which uses a prompt builder
 64    based on the run config. To override the prompt building behavior, pass a custom prompt
 65    builder to the adapter config.
 66    """
 67
 68    def __init__(
 69        self,
 70        task: Task,
 71        run_config: RunConfigProperties,
 72        config: AdapterConfig | None = None,
 73    ):
 74        self.task = task
 75        self.run_config: RunConfigProperties = run_config
 76        self.update_run_config_unknown_structured_output_mode()
 77        self.base_adapter_config = config or AdapterConfig()
 78
 79        self.prompt_builder = (
 80            self.base_adapter_config.prompt_builder
 81            or prompt_builder_from_id(run_config.prompt_id, task)
 82        )
 83        self._model_provider: KilnModelProvider | None = None
 84
 85        self.output_schema = task.output_json_schema
 86        self.input_schema = task.input_json_schema
 87
 88    def model_provider(self) -> KilnModelProvider:
 89        """
 90        Lazy load the model provider for this adapter.
 91        """
 92        if self._model_provider is not None:
 93            return self._model_provider
 94        if not self.run_config.model_name or not self.run_config.model_provider_name:
 95            raise ValueError("model_name and model_provider_name must be provided")
 96        self._model_provider = kiln_model_provider_from(
 97            self.run_config.model_name, self.run_config.model_provider_name
 98        )
 99        if not self._model_provider:
100            raise ValueError(
101                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
102            )
103        return self._model_provider
104
105    async def invoke(
106        self,
107        input: InputType,
108        input_source: DataSource | None = None,
109    ) -> TaskRun:
110        run_output, _ = await self.invoke_returning_run_output(input, input_source)
111        return run_output
112
113    async def invoke_returning_run_output(
114        self,
115        input: InputType,
116        input_source: DataSource | None = None,
117    ) -> Tuple[TaskRun, RunOutput]:
118        # validate input, allowing arrays
119        if self.input_schema is not None:
120            validate_schema_with_value_error(
121                input,
122                self.input_schema,
123                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
124                require_object=False,
125            )
126
127        # Format model input for model call (we save the original input in the task without formatting)
128        formatted_input = input
129        formatter_id = self.model_provider().formatter
130        if formatter_id is not None:
131            formatter = request_formatter_from_id(formatter_id)
132            formatted_input = formatter.format_input(input)
133
134        # Run
135        run_output, usage = await self._run(formatted_input)
136
137        # Parse
138        provider = self.model_provider()
139        parser = model_parser_from_id(provider.parser)
140        parsed_output = parser.parse_output(original_output=run_output)
141
142        # validate output
143        if self.output_schema is not None:
144            # Parse json to dict if we have structured output
145            if isinstance(parsed_output.output, str):
146                parsed_output.output = parse_json_string(parsed_output.output)
147
148            if not isinstance(parsed_output.output, dict):
149                raise RuntimeError(
150                    f"structured response is not a dict: {parsed_output.output}"
151                )
152            validate_schema_with_value_error(
153                parsed_output.output,
154                self.output_schema,
155                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
156            )
157        else:
158            if not isinstance(parsed_output.output, str):
159                raise RuntimeError(
160                    f"response is not a string for non-structured task: {parsed_output.output}"
161                )
162
163        # Validate reasoning content is present and required
164        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
165        trace_has_toolcalls = parsed_output.trace is not None and any(
166            message.get("role", None) == "tool" for message in parsed_output.trace
167        )
168        if (
169            provider.reasoning_capable
170            and (
171                not parsed_output.intermediate_outputs
172                or "reasoning" not in parsed_output.intermediate_outputs
173            )
174            and not (
175                provider.reasoning_optional_for_structured_output
176                and self.has_structured_output()
177            )
178            and not (trace_has_toolcalls)
179        ):
180            raise RuntimeError(
181                "Reasoning is required for this model, but no reasoning was returned."
182            )
183
184        # Generate the run and output
185        run = self.generate_run(
186            input, input_source, parsed_output, usage, run_output.trace
187        )
188
189        # Save the run if configured to do so, and we have a path to save to
190        if (
191            self.base_adapter_config.allow_saving
192            and Config.shared().autosave_runs
193            and self.task.path is not None
194        ):
195            run.save_to_file()
196        else:
197            # Clear the ID to indicate it's not persisted
198            run.id = None
199
200        return run, run_output
201
202    def has_structured_output(self) -> bool:
203        return self.output_schema is not None
204
205    @abstractmethod
206    def adapter_name(self) -> str:
207        pass
208
209    @abstractmethod
210    async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]:
211        pass
212
213    def build_prompt(self) -> str:
214        # The prompt builder needs to know if we want to inject formatting instructions
215        structured_output_mode = self.run_config.structured_output_mode
216        add_json_instructions = self.has_structured_output() and (
217            structured_output_mode == StructuredOutputMode.json_instructions
218            or structured_output_mode
219            == StructuredOutputMode.json_instruction_and_object
220        )
221
222        return self.prompt_builder.build_prompt(
223            include_json_instructions=add_json_instructions
224        )
225
226    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
227        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
228
229        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
230        system_message = self.build_prompt()
231
232        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
233        if not cot_prompt:
234            return get_chat_formatter(
235                strategy=ChatStrategy.single_turn,
236                system_message=system_message,
237                user_input=input,
238            )
239
240        # Some models like finetunes are trained with a specific chat strategy. Use that.
241        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
242        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
243        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
244            return get_chat_formatter(
245                strategy=tuned_chat_strategy,
246                system_message=system_message,
247                user_input=input,
248                thinking_instructions=cot_prompt,
249            )
250
251        # Pick the best chat strategy for the model given it has a cot prompt.
252        reasoning_capable = self.model_provider().reasoning_capable
253        if reasoning_capable:
254            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
255            # A simple message with the COT prompt appended to the message list is sufficient
256            return get_chat_formatter(
257                strategy=ChatStrategy.single_turn_r1_thinking,
258                system_message=system_message,
259                user_input=input,
260                thinking_instructions=cot_prompt,
261            )
262        else:
263            # Unstructured output with COT
264            # Two calls to separate the thinking from the final response
265            return get_chat_formatter(
266                strategy=ChatStrategy.two_message_cot,
267                system_message=system_message,
268                user_input=input,
269                thinking_instructions=cot_prompt,
270            )
271
272    # create a run and task output
273    def generate_run(
274        self,
275        input: InputType,
276        input_source: DataSource | None,
277        run_output: RunOutput,
278        usage: Usage | None = None,
279        trace: list[ChatCompletionMessageParam] | None = None,
280    ) -> TaskRun:
281        # Convert input and output to JSON strings if they aren't strings
282        input_str = (
283            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
284        )
285        output_str = (
286            json.dumps(run_output.output, ensure_ascii=False)
287            if isinstance(run_output.output, dict)
288            else run_output.output
289        )
290
291        # If no input source is provided, use the human data source
292        if input_source is None:
293            input_source = DataSource(
294                type=DataSourceType.human,
295                properties={"created_by": Config.shared().user_id},
296            )
297
298        new_task_run = TaskRun(
299            parent=self.task,
300            input=input_str,
301            input_source=input_source,
302            output=TaskOutput(
303                output=output_str,
304                # Synthetic since an adapter, not a human, is creating this
305                source=DataSource(
306                    type=DataSourceType.synthetic,
307                    properties=self._properties_for_task_output(),
308                    run_config=self.run_config,
309                ),
310            ),
311            intermediate_outputs=run_output.intermediate_outputs,
312            tags=self.base_adapter_config.default_tags or [],
313            usage=usage,
314            trace=trace,
315        )
316
317        return new_task_run
318
319    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
320        props = {}
321
322        props["adapter_name"] = self.adapter_name()
323
324        # Legacy properties where we save the run_config details into custom properties.
325        # These are now also be saved in the run_config field.
326        props["model_name"] = self.run_config.model_name
327        props["model_provider"] = self.run_config.model_provider_name
328        props["prompt_id"] = self.run_config.prompt_id
329        props["structured_output_mode"] = self.run_config.structured_output_mode
330        props["temperature"] = self.run_config.temperature
331        props["top_p"] = self.run_config.top_p
332
333        return props
334
335    def update_run_config_unknown_structured_output_mode(self) -> None:
336        structured_output_mode = self.run_config.structured_output_mode
337
338        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
339        # Look up our recommended mode from ml_model_list if we have one
340        if structured_output_mode == StructuredOutputMode.unknown:
341            new_run_config = self.run_config.model_copy(deep=True)
342            structured_output_mode = default_structured_output_mode_for_model_provider(
343                self.run_config.model_name,
344                self.run_config.model_provider_name,
345            )
346            new_run_config.structured_output_mode = structured_output_mode
347            self.run_config = new_run_config
348
349    async def available_tools(self) -> list[KilnToolInterface]:
350        tool_config = self.run_config.tools_config
351        if tool_config is None or tool_config.tools is None:
352            return []
353
354        project = self.task.parent_project()
355        if project is None:
356            raise ValueError("Task must have a parent project to resolve tools")
357
358        project_id = project.id
359        if project_id is None:
360            raise ValueError("Project must have an ID to resolve tools")
361
362        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
363
364        # Check each tool has a unique name
365        tool_names = [await tool.name() for tool in tools]
366        if len(tool_names) != len(set(tool_names)):
367            raise ValueError(
368                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
369            )
370
371        return tools

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.

task
run_config: kiln_ai.datamodel.run_config.RunConfigProperties
base_adapter_config
prompt_builder
output_schema
input_schema
def model_provider(self) -> kiln_ai.adapters.ml_model_list.KilnModelProvider:
 88    def model_provider(self) -> KilnModelProvider:
 89        """
 90        Lazy load the model provider for this adapter.
 91        """
 92        if self._model_provider is not None:
 93            return self._model_provider
 94        if not self.run_config.model_name or not self.run_config.model_provider_name:
 95            raise ValueError("model_name and model_provider_name must be provided")
 96        self._model_provider = kiln_model_provider_from(
 97            self.run_config.model_name, self.run_config.model_provider_name
 98        )
 99        if not self._model_provider:
100            raise ValueError(
101                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
102            )
103        return self._model_provider

Lazy load the model provider for this adapter.

async def invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
105    async def invoke(
106        self,
107        input: InputType,
108        input_source: DataSource | None = None,
109    ) -> TaskRun:
110        run_output, _ = await self.invoke_returning_run_output(input, input_source)
111        return run_output
async def invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
113    async def invoke_returning_run_output(
114        self,
115        input: InputType,
116        input_source: DataSource | None = None,
117    ) -> Tuple[TaskRun, RunOutput]:
118        # validate input, allowing arrays
119        if self.input_schema is not None:
120            validate_schema_with_value_error(
121                input,
122                self.input_schema,
123                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
124                require_object=False,
125            )
126
127        # Format model input for model call (we save the original input in the task without formatting)
128        formatted_input = input
129        formatter_id = self.model_provider().formatter
130        if formatter_id is not None:
131            formatter = request_formatter_from_id(formatter_id)
132            formatted_input = formatter.format_input(input)
133
134        # Run
135        run_output, usage = await self._run(formatted_input)
136
137        # Parse
138        provider = self.model_provider()
139        parser = model_parser_from_id(provider.parser)
140        parsed_output = parser.parse_output(original_output=run_output)
141
142        # validate output
143        if self.output_schema is not None:
144            # Parse json to dict if we have structured output
145            if isinstance(parsed_output.output, str):
146                parsed_output.output = parse_json_string(parsed_output.output)
147
148            if not isinstance(parsed_output.output, dict):
149                raise RuntimeError(
150                    f"structured response is not a dict: {parsed_output.output}"
151                )
152            validate_schema_with_value_error(
153                parsed_output.output,
154                self.output_schema,
155                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
156            )
157        else:
158            if not isinstance(parsed_output.output, str):
159                raise RuntimeError(
160                    f"response is not a string for non-structured task: {parsed_output.output}"
161                )
162
163        # Validate reasoning content is present and required
164        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
165        trace_has_toolcalls = parsed_output.trace is not None and any(
166            message.get("role", None) == "tool" for message in parsed_output.trace
167        )
168        if (
169            provider.reasoning_capable
170            and (
171                not parsed_output.intermediate_outputs
172                or "reasoning" not in parsed_output.intermediate_outputs
173            )
174            and not (
175                provider.reasoning_optional_for_structured_output
176                and self.has_structured_output()
177            )
178            and not (trace_has_toolcalls)
179        ):
180            raise RuntimeError(
181                "Reasoning is required for this model, but no reasoning was returned."
182            )
183
184        # Generate the run and output
185        run = self.generate_run(
186            input, input_source, parsed_output, usage, run_output.trace
187        )
188
189        # Save the run if configured to do so, and we have a path to save to
190        if (
191            self.base_adapter_config.allow_saving
192            and Config.shared().autosave_runs
193            and self.task.path is not None
194        ):
195            run.save_to_file()
196        else:
197            # Clear the ID to indicate it's not persisted
198            run.id = None
199
200        return run, run_output
def has_structured_output(self) -> bool:
202    def has_structured_output(self) -> bool:
203        return self.output_schema is not None
@abstractmethod
def adapter_name(self) -> str:
205    @abstractmethod
206    def adapter_name(self) -> str:
207        pass
def build_prompt(self) -> str:
213    def build_prompt(self) -> str:
214        # The prompt builder needs to know if we want to inject formatting instructions
215        structured_output_mode = self.run_config.structured_output_mode
216        add_json_instructions = self.has_structured_output() and (
217            structured_output_mode == StructuredOutputMode.json_instructions
218            or structured_output_mode
219            == StructuredOutputMode.json_instruction_and_object
220        )
221
222        return self.prompt_builder.build_prompt(
223            include_json_instructions=add_json_instructions
224        )
def build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str]) -> kiln_ai.adapters.chat.ChatFormatter:
226    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
227        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
228
229        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
230        system_message = self.build_prompt()
231
232        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
233        if not cot_prompt:
234            return get_chat_formatter(
235                strategy=ChatStrategy.single_turn,
236                system_message=system_message,
237                user_input=input,
238            )
239
240        # Some models like finetunes are trained with a specific chat strategy. Use that.
241        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
242        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
243        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
244            return get_chat_formatter(
245                strategy=tuned_chat_strategy,
246                system_message=system_message,
247                user_input=input,
248                thinking_instructions=cot_prompt,
249            )
250
251        # Pick the best chat strategy for the model given it has a cot prompt.
252        reasoning_capable = self.model_provider().reasoning_capable
253        if reasoning_capable:
254            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
255            # A simple message with the COT prompt appended to the message list is sufficient
256            return get_chat_formatter(
257                strategy=ChatStrategy.single_turn_r1_thinking,
258                system_message=system_message,
259                user_input=input,
260                thinking_instructions=cot_prompt,
261            )
262        else:
263            # Unstructured output with COT
264            # Two calls to separate the thinking from the final response
265            return get_chat_formatter(
266                strategy=ChatStrategy.two_message_cot,
267                system_message=system_message,
268                user_input=input,
269                thinking_instructions=cot_prompt,
270            )
def generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
273    def generate_run(
274        self,
275        input: InputType,
276        input_source: DataSource | None,
277        run_output: RunOutput,
278        usage: Usage | None = None,
279        trace: list[ChatCompletionMessageParam] | None = None,
280    ) -> TaskRun:
281        # Convert input and output to JSON strings if they aren't strings
282        input_str = (
283            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
284        )
285        output_str = (
286            json.dumps(run_output.output, ensure_ascii=False)
287            if isinstance(run_output.output, dict)
288            else run_output.output
289        )
290
291        # If no input source is provided, use the human data source
292        if input_source is None:
293            input_source = DataSource(
294                type=DataSourceType.human,
295                properties={"created_by": Config.shared().user_id},
296            )
297
298        new_task_run = TaskRun(
299            parent=self.task,
300            input=input_str,
301            input_source=input_source,
302            output=TaskOutput(
303                output=output_str,
304                # Synthetic since an adapter, not a human, is creating this
305                source=DataSource(
306                    type=DataSourceType.synthetic,
307                    properties=self._properties_for_task_output(),
308                    run_config=self.run_config,
309                ),
310            ),
311            intermediate_outputs=run_output.intermediate_outputs,
312            tags=self.base_adapter_config.default_tags or [],
313            usage=usage,
314            trace=trace,
315        )
316
317        return new_task_run
def update_run_config_unknown_structured_output_mode(self) -> None:
335    def update_run_config_unknown_structured_output_mode(self) -> None:
336        structured_output_mode = self.run_config.structured_output_mode
337
338        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
339        # Look up our recommended mode from ml_model_list if we have one
340        if structured_output_mode == StructuredOutputMode.unknown:
341            new_run_config = self.run_config.model_copy(deep=True)
342            structured_output_mode = default_structured_output_mode_for_model_provider(
343                self.run_config.model_name,
344                self.run_config.model_provider_name,
345            )
346            new_run_config.structured_output_mode = structured_output_mode
347            self.run_config = new_run_config
async def available_tools(self) -> list[kiln_ai.tools.KilnToolInterface]:
349    async def available_tools(self) -> list[KilnToolInterface]:
350        tool_config = self.run_config.tools_config
351        if tool_config is None or tool_config.tools is None:
352            return []
353
354        project = self.task.parent_project()
355        if project is None:
356            raise ValueError("Task must have a parent project to resolve tools")
357
358        project_id = project.id
359        if project_id is None:
360            raise ValueError("Project must have an ID to resolve tools")
361
362        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
363
364        # Check each tool has a unique name
365        tool_names = [await tool.name() for tool in tools]
366        if len(tool_names) != len(set(tool_names)):
367            raise ValueError(
368                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
369            )
370
371        return tools