kiln_ai.adapters.model_adapters.base_adapter

  1import json
  2from abc import ABCMeta, abstractmethod
  3from dataclasses import dataclass
  4from typing import Dict, Tuple
  5
  6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter
  7from kiln_ai.adapters.ml_model_list import (
  8    KilnModelProvider,
  9    StructuredOutputMode,
 10    default_structured_output_mode_for_model_provider,
 11)
 12from kiln_ai.adapters.parsers.json_parser import parse_json_string
 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id
 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id
 15from kiln_ai.adapters.prompt_builders import prompt_builder_from_id
 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from
 17from kiln_ai.adapters.run_output import RunOutput
 18from kiln_ai.datamodel import (
 19    DataSource,
 20    DataSourceType,
 21    Task,
 22    TaskOutput,
 23    TaskRun,
 24    Usage,
 25)
 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error
 28from kiln_ai.datamodel.task import RunConfigProperties
 29from kiln_ai.tools import KilnToolInterface
 30from kiln_ai.tools.tool_registry import tool_from_id
 31from kiln_ai.utils.config import Config
 32from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
 33
 34
 35@dataclass
 36class AdapterConfig:
 37    """
 38    An adapter config is config options that do NOT impact the output of the model.
 39
 40    For example: if it's saved, of if we request additional data like logprobs.
 41    """
 42
 43    allow_saving: bool = True
 44    top_logprobs: int | None = None
 45    default_tags: list[str] | None = None
 46
 47
 48class BaseAdapter(metaclass=ABCMeta):
 49    """Base class for AI model adapters that handle task execution.
 50
 51    This abstract class provides the foundation for implementing model-specific adapters
 52    that can process tasks with structured or unstructured inputs/outputs. It handles
 53    input/output validation, prompt building, and run tracking.
 54    """
 55
 56    def __init__(
 57        self,
 58        task: Task,
 59        run_config: RunConfigProperties,
 60        config: AdapterConfig | None = None,
 61    ):
 62        self.task = task
 63        self.run_config = run_config
 64        self.update_run_config_unknown_structured_output_mode()
 65        self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task)
 66        self._model_provider: KilnModelProvider | None = None
 67
 68        self.output_schema = task.output_json_schema
 69        self.input_schema = task.input_json_schema
 70        self.base_adapter_config = config or AdapterConfig()
 71
 72    def model_provider(self) -> KilnModelProvider:
 73        """
 74        Lazy load the model provider for this adapter.
 75        """
 76        if self._model_provider is not None:
 77            return self._model_provider
 78        if not self.run_config.model_name or not self.run_config.model_provider_name:
 79            raise ValueError("model_name and model_provider_name must be provided")
 80        self._model_provider = kiln_model_provider_from(
 81            self.run_config.model_name, self.run_config.model_provider_name
 82        )
 83        if not self._model_provider:
 84            raise ValueError(
 85                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
 86            )
 87        return self._model_provider
 88
 89    async def invoke(
 90        self,
 91        input: InputType,
 92        input_source: DataSource | None = None,
 93    ) -> TaskRun:
 94        run_output, _ = await self.invoke_returning_run_output(input, input_source)
 95        return run_output
 96
 97    async def invoke_returning_run_output(
 98        self,
 99        input: InputType,
100        input_source: DataSource | None = None,
101    ) -> Tuple[TaskRun, RunOutput]:
102        # validate input, allowing arrays
103        if self.input_schema is not None:
104            validate_schema_with_value_error(
105                input,
106                self.input_schema,
107                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
108                require_object=False,
109            )
110
111        # Format model input for model call (we save the original input in the task without formatting)
112        formatted_input = input
113        formatter_id = self.model_provider().formatter
114        if formatter_id is not None:
115            formatter = request_formatter_from_id(formatter_id)
116            formatted_input = formatter.format_input(input)
117
118        # Run
119        run_output, usage = await self._run(formatted_input)
120
121        # Parse
122        provider = self.model_provider()
123        parser = model_parser_from_id(provider.parser)
124        parsed_output = parser.parse_output(original_output=run_output)
125
126        # validate output
127        if self.output_schema is not None:
128            # Parse json to dict if we have structured output
129            if isinstance(parsed_output.output, str):
130                parsed_output.output = parse_json_string(parsed_output.output)
131
132            if not isinstance(parsed_output.output, dict):
133                raise RuntimeError(
134                    f"structured response is not a dict: {parsed_output.output}"
135                )
136            validate_schema_with_value_error(
137                parsed_output.output,
138                self.output_schema,
139                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
140            )
141        else:
142            if not isinstance(parsed_output.output, str):
143                raise RuntimeError(
144                    f"response is not a string for non-structured task: {parsed_output.output}"
145                )
146
147        # Validate reasoning content is present and required
148        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
149        trace_has_toolcalls = parsed_output.trace is not None and any(
150            message.get("role", None) == "tool" for message in parsed_output.trace
151        )
152        if (
153            provider.reasoning_capable
154            and (
155                not parsed_output.intermediate_outputs
156                or "reasoning" not in parsed_output.intermediate_outputs
157            )
158            and not (
159                provider.reasoning_optional_for_structured_output
160                and self.has_structured_output()
161            )
162            and not (trace_has_toolcalls)
163        ):
164            raise RuntimeError(
165                "Reasoning is required for this model, but no reasoning was returned."
166            )
167
168        # Generate the run and output
169        run = self.generate_run(
170            input, input_source, parsed_output, usage, run_output.trace
171        )
172
173        # Save the run if configured to do so, and we have a path to save to
174        if (
175            self.base_adapter_config.allow_saving
176            and Config.shared().autosave_runs
177            and self.task.path is not None
178        ):
179            run.save_to_file()
180        else:
181            # Clear the ID to indicate it's not persisted
182            run.id = None
183
184        return run, run_output
185
186    def has_structured_output(self) -> bool:
187        return self.output_schema is not None
188
189    @abstractmethod
190    def adapter_name(self) -> str:
191        pass
192
193    @abstractmethod
194    async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]:
195        pass
196
197    def build_prompt(self) -> str:
198        # The prompt builder needs to know if we want to inject formatting instructions
199        structured_output_mode = self.run_config.structured_output_mode
200        add_json_instructions = self.has_structured_output() and (
201            structured_output_mode == StructuredOutputMode.json_instructions
202            or structured_output_mode
203            == StructuredOutputMode.json_instruction_and_object
204        )
205
206        return self.prompt_builder.build_prompt(
207            include_json_instructions=add_json_instructions
208        )
209
210    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
211        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
212
213        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
214        system_message = self.build_prompt()
215
216        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
217        if not cot_prompt:
218            return get_chat_formatter(
219                strategy=ChatStrategy.single_turn,
220                system_message=system_message,
221                user_input=input,
222            )
223
224        # Some models like finetunes are trained with a specific chat strategy. Use that.
225        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
226        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
227        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
228            return get_chat_formatter(
229                strategy=tuned_chat_strategy,
230                system_message=system_message,
231                user_input=input,
232                thinking_instructions=cot_prompt,
233            )
234
235        # Pick the best chat strategy for the model given it has a cot prompt.
236        reasoning_capable = self.model_provider().reasoning_capable
237        if reasoning_capable:
238            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
239            # A simple message with the COT prompt appended to the message list is sufficient
240            return get_chat_formatter(
241                strategy=ChatStrategy.single_turn_r1_thinking,
242                system_message=system_message,
243                user_input=input,
244                thinking_instructions=cot_prompt,
245            )
246        else:
247            # Unstructured output with COT
248            # Two calls to separate the thinking from the final response
249            return get_chat_formatter(
250                strategy=ChatStrategy.two_message_cot,
251                system_message=system_message,
252                user_input=input,
253                thinking_instructions=cot_prompt,
254            )
255
256    # create a run and task output
257    def generate_run(
258        self,
259        input: InputType,
260        input_source: DataSource | None,
261        run_output: RunOutput,
262        usage: Usage | None = None,
263        trace: list[ChatCompletionMessageParam] | None = None,
264    ) -> TaskRun:
265        # Convert input and output to JSON strings if they aren't strings
266        input_str = (
267            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
268        )
269        output_str = (
270            json.dumps(run_output.output, ensure_ascii=False)
271            if isinstance(run_output.output, dict)
272            else run_output.output
273        )
274
275        # If no input source is provided, use the human data source
276        if input_source is None:
277            input_source = DataSource(
278                type=DataSourceType.human,
279                properties={"created_by": Config.shared().user_id},
280            )
281
282        new_task_run = TaskRun(
283            parent=self.task,
284            input=input_str,
285            input_source=input_source,
286            output=TaskOutput(
287                output=output_str,
288                # Synthetic since an adapter, not a human, is creating this
289                source=DataSource(
290                    type=DataSourceType.synthetic,
291                    properties=self._properties_for_task_output(),
292                    run_config=self.run_config,
293                ),
294            ),
295            intermediate_outputs=run_output.intermediate_outputs,
296            tags=self.base_adapter_config.default_tags or [],
297            usage=usage,
298            trace=trace,
299        )
300
301        return new_task_run
302
303    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
304        props = {}
305
306        props["adapter_name"] = self.adapter_name()
307
308        # Legacy properties where we save the run_config details into custom properties.
309        # These are now also be saved in the run_config field.
310        props["model_name"] = self.run_config.model_name
311        props["model_provider"] = self.run_config.model_provider_name
312        props["prompt_id"] = self.run_config.prompt_id
313        props["structured_output_mode"] = self.run_config.structured_output_mode
314        props["temperature"] = self.run_config.temperature
315        props["top_p"] = self.run_config.top_p
316
317        return props
318
319    def update_run_config_unknown_structured_output_mode(self) -> None:
320        structured_output_mode = self.run_config.structured_output_mode
321
322        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
323        # Look up our recommended mode from ml_model_list if we have one
324        if structured_output_mode == StructuredOutputMode.unknown:
325            new_run_config = self.run_config.model_copy(deep=True)
326            structured_output_mode = default_structured_output_mode_for_model_provider(
327                self.run_config.model_name,
328                self.run_config.model_provider_name,
329            )
330            new_run_config.structured_output_mode = structured_output_mode
331            self.run_config = new_run_config
332
333    async def available_tools(self) -> list[KilnToolInterface]:
334        tool_config = self.run_config.tools_config
335        if tool_config is None or tool_config.tools is None:
336            return []
337
338        project = self.task.parent_project()
339        if project is None:
340            raise ValueError("Task must have a parent project to resolve tools")
341
342        project_id = project.id
343        if project_id is None:
344            raise ValueError("Project must have an ID to resolve tools")
345
346        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
347
348        # Check each tool has a unique name
349        tool_names = [await tool.name() for tool in tools]
350        if len(tool_names) != len(set(tool_names)):
351            raise ValueError(
352                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
353            )
354
355        return tools
@dataclass
class AdapterConfig:
36@dataclass
37class AdapterConfig:
38    """
39    An adapter config is config options that do NOT impact the output of the model.
40
41    For example: if it's saved, of if we request additional data like logprobs.
42    """
43
44    allow_saving: bool = True
45    top_logprobs: int | None = None
46    default_tags: list[str] | None = None

An adapter config is config options that do NOT impact the output of the model.

For example: if it's saved, of if we request additional data like logprobs.

AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None)
allow_saving: bool = True
top_logprobs: int | None = None
default_tags: list[str] | None = None
class BaseAdapter:
 49class BaseAdapter(metaclass=ABCMeta):
 50    """Base class for AI model adapters that handle task execution.
 51
 52    This abstract class provides the foundation for implementing model-specific adapters
 53    that can process tasks with structured or unstructured inputs/outputs. It handles
 54    input/output validation, prompt building, and run tracking.
 55    """
 56
 57    def __init__(
 58        self,
 59        task: Task,
 60        run_config: RunConfigProperties,
 61        config: AdapterConfig | None = None,
 62    ):
 63        self.task = task
 64        self.run_config = run_config
 65        self.update_run_config_unknown_structured_output_mode()
 66        self.prompt_builder = prompt_builder_from_id(run_config.prompt_id, task)
 67        self._model_provider: KilnModelProvider | None = None
 68
 69        self.output_schema = task.output_json_schema
 70        self.input_schema = task.input_json_schema
 71        self.base_adapter_config = config or AdapterConfig()
 72
 73    def model_provider(self) -> KilnModelProvider:
 74        """
 75        Lazy load the model provider for this adapter.
 76        """
 77        if self._model_provider is not None:
 78            return self._model_provider
 79        if not self.run_config.model_name or not self.run_config.model_provider_name:
 80            raise ValueError("model_name and model_provider_name must be provided")
 81        self._model_provider = kiln_model_provider_from(
 82            self.run_config.model_name, self.run_config.model_provider_name
 83        )
 84        if not self._model_provider:
 85            raise ValueError(
 86                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
 87            )
 88        return self._model_provider
 89
 90    async def invoke(
 91        self,
 92        input: InputType,
 93        input_source: DataSource | None = None,
 94    ) -> TaskRun:
 95        run_output, _ = await self.invoke_returning_run_output(input, input_source)
 96        return run_output
 97
 98    async def invoke_returning_run_output(
 99        self,
100        input: InputType,
101        input_source: DataSource | None = None,
102    ) -> Tuple[TaskRun, RunOutput]:
103        # validate input, allowing arrays
104        if self.input_schema is not None:
105            validate_schema_with_value_error(
106                input,
107                self.input_schema,
108                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
109                require_object=False,
110            )
111
112        # Format model input for model call (we save the original input in the task without formatting)
113        formatted_input = input
114        formatter_id = self.model_provider().formatter
115        if formatter_id is not None:
116            formatter = request_formatter_from_id(formatter_id)
117            formatted_input = formatter.format_input(input)
118
119        # Run
120        run_output, usage = await self._run(formatted_input)
121
122        # Parse
123        provider = self.model_provider()
124        parser = model_parser_from_id(provider.parser)
125        parsed_output = parser.parse_output(original_output=run_output)
126
127        # validate output
128        if self.output_schema is not None:
129            # Parse json to dict if we have structured output
130            if isinstance(parsed_output.output, str):
131                parsed_output.output = parse_json_string(parsed_output.output)
132
133            if not isinstance(parsed_output.output, dict):
134                raise RuntimeError(
135                    f"structured response is not a dict: {parsed_output.output}"
136                )
137            validate_schema_with_value_error(
138                parsed_output.output,
139                self.output_schema,
140                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
141            )
142        else:
143            if not isinstance(parsed_output.output, str):
144                raise RuntimeError(
145                    f"response is not a string for non-structured task: {parsed_output.output}"
146                )
147
148        # Validate reasoning content is present and required
149        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
150        trace_has_toolcalls = parsed_output.trace is not None and any(
151            message.get("role", None) == "tool" for message in parsed_output.trace
152        )
153        if (
154            provider.reasoning_capable
155            and (
156                not parsed_output.intermediate_outputs
157                or "reasoning" not in parsed_output.intermediate_outputs
158            )
159            and not (
160                provider.reasoning_optional_for_structured_output
161                and self.has_structured_output()
162            )
163            and not (trace_has_toolcalls)
164        ):
165            raise RuntimeError(
166                "Reasoning is required for this model, but no reasoning was returned."
167            )
168
169        # Generate the run and output
170        run = self.generate_run(
171            input, input_source, parsed_output, usage, run_output.trace
172        )
173
174        # Save the run if configured to do so, and we have a path to save to
175        if (
176            self.base_adapter_config.allow_saving
177            and Config.shared().autosave_runs
178            and self.task.path is not None
179        ):
180            run.save_to_file()
181        else:
182            # Clear the ID to indicate it's not persisted
183            run.id = None
184
185        return run, run_output
186
187    def has_structured_output(self) -> bool:
188        return self.output_schema is not None
189
190    @abstractmethod
191    def adapter_name(self) -> str:
192        pass
193
194    @abstractmethod
195    async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]:
196        pass
197
198    def build_prompt(self) -> str:
199        # The prompt builder needs to know if we want to inject formatting instructions
200        structured_output_mode = self.run_config.structured_output_mode
201        add_json_instructions = self.has_structured_output() and (
202            structured_output_mode == StructuredOutputMode.json_instructions
203            or structured_output_mode
204            == StructuredOutputMode.json_instruction_and_object
205        )
206
207        return self.prompt_builder.build_prompt(
208            include_json_instructions=add_json_instructions
209        )
210
211    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
212        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
213
214        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
215        system_message = self.build_prompt()
216
217        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
218        if not cot_prompt:
219            return get_chat_formatter(
220                strategy=ChatStrategy.single_turn,
221                system_message=system_message,
222                user_input=input,
223            )
224
225        # Some models like finetunes are trained with a specific chat strategy. Use that.
226        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
227        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
228        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
229            return get_chat_formatter(
230                strategy=tuned_chat_strategy,
231                system_message=system_message,
232                user_input=input,
233                thinking_instructions=cot_prompt,
234            )
235
236        # Pick the best chat strategy for the model given it has a cot prompt.
237        reasoning_capable = self.model_provider().reasoning_capable
238        if reasoning_capable:
239            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
240            # A simple message with the COT prompt appended to the message list is sufficient
241            return get_chat_formatter(
242                strategy=ChatStrategy.single_turn_r1_thinking,
243                system_message=system_message,
244                user_input=input,
245                thinking_instructions=cot_prompt,
246            )
247        else:
248            # Unstructured output with COT
249            # Two calls to separate the thinking from the final response
250            return get_chat_formatter(
251                strategy=ChatStrategy.two_message_cot,
252                system_message=system_message,
253                user_input=input,
254                thinking_instructions=cot_prompt,
255            )
256
257    # create a run and task output
258    def generate_run(
259        self,
260        input: InputType,
261        input_source: DataSource | None,
262        run_output: RunOutput,
263        usage: Usage | None = None,
264        trace: list[ChatCompletionMessageParam] | None = None,
265    ) -> TaskRun:
266        # Convert input and output to JSON strings if they aren't strings
267        input_str = (
268            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
269        )
270        output_str = (
271            json.dumps(run_output.output, ensure_ascii=False)
272            if isinstance(run_output.output, dict)
273            else run_output.output
274        )
275
276        # If no input source is provided, use the human data source
277        if input_source is None:
278            input_source = DataSource(
279                type=DataSourceType.human,
280                properties={"created_by": Config.shared().user_id},
281            )
282
283        new_task_run = TaskRun(
284            parent=self.task,
285            input=input_str,
286            input_source=input_source,
287            output=TaskOutput(
288                output=output_str,
289                # Synthetic since an adapter, not a human, is creating this
290                source=DataSource(
291                    type=DataSourceType.synthetic,
292                    properties=self._properties_for_task_output(),
293                    run_config=self.run_config,
294                ),
295            ),
296            intermediate_outputs=run_output.intermediate_outputs,
297            tags=self.base_adapter_config.default_tags or [],
298            usage=usage,
299            trace=trace,
300        )
301
302        return new_task_run
303
304    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
305        props = {}
306
307        props["adapter_name"] = self.adapter_name()
308
309        # Legacy properties where we save the run_config details into custom properties.
310        # These are now also be saved in the run_config field.
311        props["model_name"] = self.run_config.model_name
312        props["model_provider"] = self.run_config.model_provider_name
313        props["prompt_id"] = self.run_config.prompt_id
314        props["structured_output_mode"] = self.run_config.structured_output_mode
315        props["temperature"] = self.run_config.temperature
316        props["top_p"] = self.run_config.top_p
317
318        return props
319
320    def update_run_config_unknown_structured_output_mode(self) -> None:
321        structured_output_mode = self.run_config.structured_output_mode
322
323        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
324        # Look up our recommended mode from ml_model_list if we have one
325        if structured_output_mode == StructuredOutputMode.unknown:
326            new_run_config = self.run_config.model_copy(deep=True)
327            structured_output_mode = default_structured_output_mode_for_model_provider(
328                self.run_config.model_name,
329                self.run_config.model_provider_name,
330            )
331            new_run_config.structured_output_mode = structured_output_mode
332            self.run_config = new_run_config
333
334    async def available_tools(self) -> list[KilnToolInterface]:
335        tool_config = self.run_config.tools_config
336        if tool_config is None or tool_config.tools is None:
337            return []
338
339        project = self.task.parent_project()
340        if project is None:
341            raise ValueError("Task must have a parent project to resolve tools")
342
343        project_id = project.id
344        if project_id is None:
345            raise ValueError("Project must have an ID to resolve tools")
346
347        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
348
349        # Check each tool has a unique name
350        tool_names = [await tool.name() for tool in tools]
351        if len(tool_names) != len(set(tool_names)):
352            raise ValueError(
353                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
354            )
355
356        return tools

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

task
run_config
prompt_builder
output_schema
input_schema
base_adapter_config
def model_provider(self) -> kiln_ai.adapters.ml_model_list.KilnModelProvider:
73    def model_provider(self) -> KilnModelProvider:
74        """
75        Lazy load the model provider for this adapter.
76        """
77        if self._model_provider is not None:
78            return self._model_provider
79        if not self.run_config.model_name or not self.run_config.model_provider_name:
80            raise ValueError("model_name and model_provider_name must be provided")
81        self._model_provider = kiln_model_provider_from(
82            self.run_config.model_name, self.run_config.model_provider_name
83        )
84        if not self._model_provider:
85            raise ValueError(
86                f"model_provider_name {self.run_config.model_provider_name} not found for model {self.run_config.model_name}"
87            )
88        return self._model_provider

Lazy load the model provider for this adapter.

async def invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
90    async def invoke(
91        self,
92        input: InputType,
93        input_source: DataSource | None = None,
94    ) -> TaskRun:
95        run_output, _ = await self.invoke_returning_run_output(input, input_source)
96        return run_output
async def invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
 98    async def invoke_returning_run_output(
 99        self,
100        input: InputType,
101        input_source: DataSource | None = None,
102    ) -> Tuple[TaskRun, RunOutput]:
103        # validate input, allowing arrays
104        if self.input_schema is not None:
105            validate_schema_with_value_error(
106                input,
107                self.input_schema,
108                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
109                require_object=False,
110            )
111
112        # Format model input for model call (we save the original input in the task without formatting)
113        formatted_input = input
114        formatter_id = self.model_provider().formatter
115        if formatter_id is not None:
116            formatter = request_formatter_from_id(formatter_id)
117            formatted_input = formatter.format_input(input)
118
119        # Run
120        run_output, usage = await self._run(formatted_input)
121
122        # Parse
123        provider = self.model_provider()
124        parser = model_parser_from_id(provider.parser)
125        parsed_output = parser.parse_output(original_output=run_output)
126
127        # validate output
128        if self.output_schema is not None:
129            # Parse json to dict if we have structured output
130            if isinstance(parsed_output.output, str):
131                parsed_output.output = parse_json_string(parsed_output.output)
132
133            if not isinstance(parsed_output.output, dict):
134                raise RuntimeError(
135                    f"structured response is not a dict: {parsed_output.output}"
136                )
137            validate_schema_with_value_error(
138                parsed_output.output,
139                self.output_schema,
140                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
141            )
142        else:
143            if not isinstance(parsed_output.output, str):
144                raise RuntimeError(
145                    f"response is not a string for non-structured task: {parsed_output.output}"
146                )
147
148        # Validate reasoning content is present and required
149        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
150        trace_has_toolcalls = parsed_output.trace is not None and any(
151            message.get("role", None) == "tool" for message in parsed_output.trace
152        )
153        if (
154            provider.reasoning_capable
155            and (
156                not parsed_output.intermediate_outputs
157                or "reasoning" not in parsed_output.intermediate_outputs
158            )
159            and not (
160                provider.reasoning_optional_for_structured_output
161                and self.has_structured_output()
162            )
163            and not (trace_has_toolcalls)
164        ):
165            raise RuntimeError(
166                "Reasoning is required for this model, but no reasoning was returned."
167            )
168
169        # Generate the run and output
170        run = self.generate_run(
171            input, input_source, parsed_output, usage, run_output.trace
172        )
173
174        # Save the run if configured to do so, and we have a path to save to
175        if (
176            self.base_adapter_config.allow_saving
177            and Config.shared().autosave_runs
178            and self.task.path is not None
179        ):
180            run.save_to_file()
181        else:
182            # Clear the ID to indicate it's not persisted
183            run.id = None
184
185        return run, run_output
def has_structured_output(self) -> bool:
187    def has_structured_output(self) -> bool:
188        return self.output_schema is not None
@abstractmethod
def adapter_name(self) -> str:
190    @abstractmethod
191    def adapter_name(self) -> str:
192        pass
def build_prompt(self) -> str:
198    def build_prompt(self) -> str:
199        # The prompt builder needs to know if we want to inject formatting instructions
200        structured_output_mode = self.run_config.structured_output_mode
201        add_json_instructions = self.has_structured_output() and (
202            structured_output_mode == StructuredOutputMode.json_instructions
203            or structured_output_mode
204            == StructuredOutputMode.json_instruction_and_object
205        )
206
207        return self.prompt_builder.build_prompt(
208            include_json_instructions=add_json_instructions
209        )
def build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str]) -> kiln_ai.adapters.chat.ChatFormatter:
211    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
212        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
213
214        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
215        system_message = self.build_prompt()
216
217        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
218        if not cot_prompt:
219            return get_chat_formatter(
220                strategy=ChatStrategy.single_turn,
221                system_message=system_message,
222                user_input=input,
223            )
224
225        # Some models like finetunes are trained with a specific chat strategy. Use that.
226        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
227        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
228        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
229            return get_chat_formatter(
230                strategy=tuned_chat_strategy,
231                system_message=system_message,
232                user_input=input,
233                thinking_instructions=cot_prompt,
234            )
235
236        # Pick the best chat strategy for the model given it has a cot prompt.
237        reasoning_capable = self.model_provider().reasoning_capable
238        if reasoning_capable:
239            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
240            # A simple message with the COT prompt appended to the message list is sufficient
241            return get_chat_formatter(
242                strategy=ChatStrategy.single_turn_r1_thinking,
243                system_message=system_message,
244                user_input=input,
245                thinking_instructions=cot_prompt,
246            )
247        else:
248            # Unstructured output with COT
249            # Two calls to separate the thinking from the final response
250            return get_chat_formatter(
251                strategy=ChatStrategy.two_message_cot,
252                system_message=system_message,
253                user_input=input,
254                thinking_instructions=cot_prompt,
255            )
def generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
258    def generate_run(
259        self,
260        input: InputType,
261        input_source: DataSource | None,
262        run_output: RunOutput,
263        usage: Usage | None = None,
264        trace: list[ChatCompletionMessageParam] | None = None,
265    ) -> TaskRun:
266        # Convert input and output to JSON strings if they aren't strings
267        input_str = (
268            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
269        )
270        output_str = (
271            json.dumps(run_output.output, ensure_ascii=False)
272            if isinstance(run_output.output, dict)
273            else run_output.output
274        )
275
276        # If no input source is provided, use the human data source
277        if input_source is None:
278            input_source = DataSource(
279                type=DataSourceType.human,
280                properties={"created_by": Config.shared().user_id},
281            )
282
283        new_task_run = TaskRun(
284            parent=self.task,
285            input=input_str,
286            input_source=input_source,
287            output=TaskOutput(
288                output=output_str,
289                # Synthetic since an adapter, not a human, is creating this
290                source=DataSource(
291                    type=DataSourceType.synthetic,
292                    properties=self._properties_for_task_output(),
293                    run_config=self.run_config,
294                ),
295            ),
296            intermediate_outputs=run_output.intermediate_outputs,
297            tags=self.base_adapter_config.default_tags or [],
298            usage=usage,
299            trace=trace,
300        )
301
302        return new_task_run
def update_run_config_unknown_structured_output_mode(self) -> None:
320    def update_run_config_unknown_structured_output_mode(self) -> None:
321        structured_output_mode = self.run_config.structured_output_mode
322
323        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
324        # Look up our recommended mode from ml_model_list if we have one
325        if structured_output_mode == StructuredOutputMode.unknown:
326            new_run_config = self.run_config.model_copy(deep=True)
327            structured_output_mode = default_structured_output_mode_for_model_provider(
328                self.run_config.model_name,
329                self.run_config.model_provider_name,
330            )
331            new_run_config.structured_output_mode = structured_output_mode
332            self.run_config = new_run_config
async def available_tools(self) -> list[kiln_ai.tools.KilnToolInterface]:
334    async def available_tools(self) -> list[KilnToolInterface]:
335        tool_config = self.run_config.tools_config
336        if tool_config is None or tool_config.tools is None:
337            return []
338
339        project = self.task.parent_project()
340        if project is None:
341            raise ValueError("Task must have a parent project to resolve tools")
342
343        project_id = project.id
344        if project_id is None:
345            raise ValueError("Project must have an ID to resolve tools")
346
347        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
348
349        # Check each tool has a unique name
350        tool_names = [await tool.name() for tool in tools]
351        if len(tool_names) != len(set(tool_names)):
352            raise ValueError(
353                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
354            )
355
356        return tools