kiln_ai.adapters.model_adapters.base_adapter

  1import json
  2from abc import ABCMeta, abstractmethod
  3from dataclasses import dataclass
  4from typing import Dict, Tuple
  5
  6from kiln_ai.adapters.chat.chat_formatter import ChatFormatter, get_chat_formatter
  7from kiln_ai.adapters.ml_model_list import (
  8    KilnModelProvider,
  9    StructuredOutputMode,
 10    default_structured_output_mode_for_model_provider,
 11)
 12from kiln_ai.adapters.parsers.json_parser import parse_json_string
 13from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id
 14from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id
 15from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id
 16from kiln_ai.adapters.provider_tools import kiln_model_provider_from
 17from kiln_ai.adapters.run_output import RunOutput
 18from kiln_ai.datamodel import (
 19    DataSource,
 20    DataSourceType,
 21    Task,
 22    TaskOutput,
 23    TaskRun,
 24    Usage,
 25)
 26from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
 27from kiln_ai.datamodel.json_schema import validate_schema_with_value_error
 28from kiln_ai.datamodel.run_config import (
 29    KilnAgentRunConfigProperties,
 30    as_kiln_agent_run_config,
 31)
 32from kiln_ai.datamodel.task import RunConfigProperties
 33
 34# Import agent run context for run lifecycle management
 35from kiln_ai.run_context import (
 36    clear_agent_run_id,
 37    generate_agent_run_id,
 38    get_agent_run_id,
 39    set_agent_run_id,
 40)
 41from kiln_ai.tools import KilnToolInterface
 42from kiln_ai.tools.mcp_session_manager import MCPSessionManager
 43from kiln_ai.tools.tool_registry import tool_from_id
 44from kiln_ai.utils.config import Config
 45from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
 46from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
 47
 48
 49@dataclass
 50class AdapterConfig:
 51    """
 52    An adapter config is config options that do NOT impact the output of the model.
 53
 54    For example: if it's saved, of if we request additional data like logprobs.
 55    """
 56
 57    allow_saving: bool = True
 58    top_logprobs: int | None = None
 59    default_tags: list[str] | None = None
 60
 61    """
 62    A custom prompt builder can be injected to override the system prompt building process.
 63    If not provided, the prompt builder will be created from the run_config.prompt_id which
 64    may load additional files from disk.
 65    """
 66    prompt_builder: BasePromptBuilder | None = None
 67
 68
 69class BaseAdapter(metaclass=ABCMeta):
 70    """Base class for AI model adapters that handle task execution.
 71
 72    This abstract class provides the foundation for implementing model-specific adapters
 73    that can process tasks with structured or unstructured inputs/outputs. It handles
 74    input/output validation, prompt building, and run tracking.
 75
 76    Prompt building is handled internally by the adapter, which uses a prompt builder
 77    based on the run config. To override the prompt building behavior, pass a custom prompt
 78    builder to the adapter config.
 79    """
 80
 81    def __init__(
 82        self,
 83        task: Task,
 84        run_config: RunConfigProperties,
 85        config: AdapterConfig | None = None,
 86    ):
 87        self.task = task
 88        self.run_config: RunConfigProperties = run_config
 89        self.base_adapter_config = config or AdapterConfig()
 90
 91        if isinstance(run_config, KilnAgentRunConfigProperties):
 92            self.update_run_config_unknown_structured_output_mode()
 93            self.prompt_builder = (
 94                self.base_adapter_config.prompt_builder
 95                or prompt_builder_from_id(run_config.prompt_id, task)
 96            )
 97        else:
 98            self.prompt_builder = None
 99        self._model_provider: KilnModelProvider | None = None
100
101        self.output_schema = task.output_json_schema
102        self.input_schema = task.input_json_schema
103
104    def model_provider(self) -> KilnModelProvider:
105        """
106        Lazy load the model provider for this adapter.
107        """
108        if self._model_provider is not None:
109            return self._model_provider
110        run_config = as_kiln_agent_run_config(self.run_config)
111        if not run_config.model_name or not run_config.model_provider_name:
112            raise ValueError("model_name and model_provider_name must be provided")
113        self._model_provider = kiln_model_provider_from(
114            run_config.model_name, run_config.model_provider_name
115        )
116        if not self._model_provider:
117            raise ValueError(
118                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
119            )
120        return self._model_provider
121
122    async def invoke(
123        self,
124        input: InputType,
125        input_source: DataSource | None = None,
126    ) -> TaskRun:
127        run_output, _ = await self.invoke_returning_run_output(input, input_source)
128        return run_output
129
130    async def _run_returning_run_output(
131        self,
132        input: InputType,
133        input_source: DataSource | None = None,
134    ) -> Tuple[TaskRun, RunOutput]:
135        # validate input, allowing arrays
136        if self.input_schema is not None:
137            validate_schema_with_value_error(
138                input,
139                self.input_schema,
140                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
141                require_object=False,
142            )
143
144        # Format model input for model call (we save the original input in the task without formatting)
145        formatted_input = input
146        formatter_id = self.model_provider().formatter
147        if formatter_id is not None:
148            formatter = request_formatter_from_id(formatter_id)
149            formatted_input = formatter.format_input(input)
150
151        # Run
152        run_output, usage = await self._run(formatted_input)
153
154        # Parse
155        provider = self.model_provider()
156        parser = model_parser_from_id(provider.parser)
157        parsed_output = parser.parse_output(original_output=run_output)
158
159        # validate output
160        if self.output_schema is not None:
161            # Parse json to dict if we have structured output
162            if isinstance(parsed_output.output, str):
163                parsed_output.output = parse_json_string(parsed_output.output)
164
165            if not isinstance(parsed_output.output, dict):
166                raise RuntimeError(
167                    f"structured response is not a dict: {parsed_output.output}"
168                )
169            validate_schema_with_value_error(
170                parsed_output.output,
171                self.output_schema,
172                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
173            )
174        else:
175            if not isinstance(parsed_output.output, str):
176                raise RuntimeError(
177                    f"response is not a string for non-structured task: {parsed_output.output}"
178                )
179
180        # Validate reasoning content is present and required
181        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
182        trace_has_toolcalls = parsed_output.trace is not None and any(
183            message.get("role", None) == "tool" for message in parsed_output.trace
184        )
185        if (
186            provider.reasoning_capable
187            and (
188                not parsed_output.intermediate_outputs
189                or "reasoning" not in parsed_output.intermediate_outputs
190            )
191            and not (
192                provider.reasoning_optional_for_structured_output
193                and self.has_structured_output()
194            )
195            and not (trace_has_toolcalls)
196        ):
197            raise RuntimeError(
198                "Reasoning is required for this model, but no reasoning was returned."
199            )
200
201        # Generate the run and output
202        run = self.generate_run(
203            input, input_source, parsed_output, usage, run_output.trace
204        )
205
206        # Save the run if configured to do so, and we have a path to save to
207        if (
208            self.base_adapter_config.allow_saving
209            and Config.shared().autosave_runs
210            and self.task.path is not None
211        ):
212            run.save_to_file()
213        else:
214            # Clear the ID to indicate it's not persisted
215            run.id = None
216
217        return run, run_output
218
219    async def invoke_returning_run_output(
220        self,
221        input: InputType,
222        input_source: DataSource | None = None,
223    ) -> Tuple[TaskRun, RunOutput]:
224        # Determine if this is the root agent (no existing run context)
225        is_root_agent = get_agent_run_id() is None
226
227        if is_root_agent:
228            run_id = generate_agent_run_id()
229            set_agent_run_id(run_id)
230
231        try:
232            return await self._run_returning_run_output(input, input_source)
233        finally:
234            if is_root_agent:
235                try:
236                    run_id = get_agent_run_id()
237                    if run_id:
238                        await MCPSessionManager.shared().cleanup_session(run_id)
239                finally:
240                    clear_agent_run_id()
241
242    def has_structured_output(self) -> bool:
243        return self.output_schema is not None
244
245    @abstractmethod
246    def adapter_name(self) -> str:
247        pass
248
249    @abstractmethod
250    async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]:
251        pass
252
253    def build_prompt(self) -> str:
254        if self.prompt_builder is None:
255            raise ValueError("Prompt builder is not available for MCP run config")
256        # The prompt builder needs to know if we want to inject formatting instructions
257        structured_output_mode = as_kiln_agent_run_config(
258            self.run_config
259        ).structured_output_mode
260        add_json_instructions = self.has_structured_output() and (
261            structured_output_mode == StructuredOutputMode.json_instructions
262            or structured_output_mode
263            == StructuredOutputMode.json_instruction_and_object
264        )
265
266        return self.prompt_builder.build_prompt(
267            include_json_instructions=add_json_instructions
268        )
269
270    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
271        if self.prompt_builder is None:
272            raise ValueError("Prompt builder is not available for MCP run config")
273        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
274
275        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
276        system_message = self.build_prompt()
277
278        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
279        if not cot_prompt:
280            return get_chat_formatter(
281                strategy=ChatStrategy.single_turn,
282                system_message=system_message,
283                user_input=input,
284            )
285
286        # Some models like finetunes are trained with a specific chat strategy. Use that.
287        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
288        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
289        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
290            return get_chat_formatter(
291                strategy=tuned_chat_strategy,
292                system_message=system_message,
293                user_input=input,
294                thinking_instructions=cot_prompt,
295            )
296
297        # Pick the best chat strategy for the model given it has a cot prompt.
298        reasoning_capable = self.model_provider().reasoning_capable
299        if reasoning_capable:
300            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
301            # A simple message with the COT prompt appended to the message list is sufficient
302            return get_chat_formatter(
303                strategy=ChatStrategy.single_turn_r1_thinking,
304                system_message=system_message,
305                user_input=input,
306                thinking_instructions=cot_prompt,
307            )
308        else:
309            # Unstructured output with COT
310            # Two calls to separate the thinking from the final response
311            return get_chat_formatter(
312                strategy=ChatStrategy.two_message_cot,
313                system_message=system_message,
314                user_input=input,
315                thinking_instructions=cot_prompt,
316            )
317
318    # create a run and task output
319    def generate_run(
320        self,
321        input: InputType,
322        input_source: DataSource | None,
323        run_output: RunOutput,
324        usage: Usage | None = None,
325        trace: list[ChatCompletionMessageParam] | None = None,
326    ) -> TaskRun:
327        # Convert input and output to JSON strings if they aren't strings
328        input_str = (
329            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
330        )
331        output_str = (
332            json.dumps(run_output.output, ensure_ascii=False)
333            if isinstance(run_output.output, dict)
334            else run_output.output
335        )
336
337        # If no input source is provided, use the human data source
338        if input_source is None:
339            input_source = DataSource(
340                type=DataSourceType.human,
341                properties={"created_by": Config.shared().user_id},
342            )
343
344        # Synthetic since an adapter, not a human, is creating this
345        # Special case for MCP run configs which calls a mcp tool
346        output_source_type = (
347            DataSourceType.tool_call
348            if self.run_config.type == "mcp"
349            else DataSourceType.synthetic
350        )
351
352        new_task_run = TaskRun(
353            parent=self.task,
354            input=input_str,
355            input_source=input_source,
356            output=TaskOutput(
357                output=output_str,
358                source=DataSource(
359                    type=output_source_type,
360                    properties=self._properties_for_task_output(),
361                    run_config=self.run_config,
362                ),
363            ),
364            intermediate_outputs=run_output.intermediate_outputs,
365            tags=self.base_adapter_config.default_tags or [],
366            usage=usage,
367            trace=trace,
368        )
369
370        return new_task_run
371
372    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
373        match self.run_config.type:
374            case "mcp":
375                return {}
376            case "kiln_agent":
377                if not isinstance(self.run_config, KilnAgentRunConfigProperties):
378                    raise ValueError("Kiln agent run config is required")
379                run_config = self.run_config
380
381                props: Dict[str, str | int | float] = {}
382                props["adapter_name"] = self.adapter_name()
383                # Legacy properties where we save the run_config details into custom properties.
384                # These are now also be saved in the run_config field.
385                props["model_name"] = run_config.model_name
386                props["model_provider"] = run_config.model_provider_name
387                props["prompt_id"] = run_config.prompt_id
388                props["structured_output_mode"] = run_config.structured_output_mode
389                props["temperature"] = run_config.temperature
390                props["top_p"] = run_config.top_p
391
392                return props
393            case _:
394                raise_exhaustive_enum_error(self.run_config.type)
395
396    def update_run_config_unknown_structured_output_mode(self) -> None:
397        if self.run_config.type != "kiln_agent":
398            return
399        run_config = as_kiln_agent_run_config(self.run_config)
400        structured_output_mode = run_config.structured_output_mode
401
402        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
403        # Look up our recommended mode from ml_model_list if we have one
404        if structured_output_mode == StructuredOutputMode.unknown:
405            new_run_config = run_config.model_copy(deep=True)
406            structured_output_mode = default_structured_output_mode_for_model_provider(
407                run_config.model_name,
408                run_config.model_provider_name,
409            )
410            new_run_config.structured_output_mode = structured_output_mode
411            self.run_config = new_run_config
412
413    async def available_tools(self) -> list[KilnToolInterface]:
414        if self.run_config.type != "kiln_agent":
415            return []
416        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
417        if tool_config is None or tool_config.tools is None:
418            return []
419
420        project = self.task.parent_project()
421        if project is None:
422            raise ValueError("Task must have a parent project to resolve tools")
423
424        project_id = project.id
425        if project_id is None:
426            raise ValueError("Project must have an ID to resolve tools")
427
428        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
429
430        # Check each tool has a unique name
431        tool_names = [await tool.name() for tool in tools]
432        if len(tool_names) != len(set(tool_names)):
433            raise ValueError(
434                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
435            )
436
437        return tools
@dataclass
class AdapterConfig:
50@dataclass
51class AdapterConfig:
52    """
53    An adapter config is config options that do NOT impact the output of the model.
54
55    For example: if it's saved, of if we request additional data like logprobs.
56    """
57
58    allow_saving: bool = True
59    top_logprobs: int | None = None
60    default_tags: list[str] | None = None
61
62    """
63    A custom prompt builder can be injected to override the system prompt building process.
64    If not provided, the prompt builder will be created from the run_config.prompt_id which
65    may load additional files from disk.
66    """
67    prompt_builder: BasePromptBuilder | None = None

An adapter config is config options that do NOT impact the output of the model.

For example: if it's saved, of if we request additional data like logprobs.

AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None, prompt_builder: kiln_ai.adapters.prompt_builders.BasePromptBuilder | None = None)
allow_saving: bool = True
top_logprobs: int | None = None
default_tags: list[str] | None = None

A custom prompt builder can be injected to override the system prompt building process. If not provided, the prompt builder will be created from the run_config.prompt_id which may load additional files from disk.

class BaseAdapter:
 70class BaseAdapter(metaclass=ABCMeta):
 71    """Base class for AI model adapters that handle task execution.
 72
 73    This abstract class provides the foundation for implementing model-specific adapters
 74    that can process tasks with structured or unstructured inputs/outputs. It handles
 75    input/output validation, prompt building, and run tracking.
 76
 77    Prompt building is handled internally by the adapter, which uses a prompt builder
 78    based on the run config. To override the prompt building behavior, pass a custom prompt
 79    builder to the adapter config.
 80    """
 81
 82    def __init__(
 83        self,
 84        task: Task,
 85        run_config: RunConfigProperties,
 86        config: AdapterConfig | None = None,
 87    ):
 88        self.task = task
 89        self.run_config: RunConfigProperties = run_config
 90        self.base_adapter_config = config or AdapterConfig()
 91
 92        if isinstance(run_config, KilnAgentRunConfigProperties):
 93            self.update_run_config_unknown_structured_output_mode()
 94            self.prompt_builder = (
 95                self.base_adapter_config.prompt_builder
 96                or prompt_builder_from_id(run_config.prompt_id, task)
 97            )
 98        else:
 99            self.prompt_builder = None
100        self._model_provider: KilnModelProvider | None = None
101
102        self.output_schema = task.output_json_schema
103        self.input_schema = task.input_json_schema
104
105    def model_provider(self) -> KilnModelProvider:
106        """
107        Lazy load the model provider for this adapter.
108        """
109        if self._model_provider is not None:
110            return self._model_provider
111        run_config = as_kiln_agent_run_config(self.run_config)
112        if not run_config.model_name or not run_config.model_provider_name:
113            raise ValueError("model_name and model_provider_name must be provided")
114        self._model_provider = kiln_model_provider_from(
115            run_config.model_name, run_config.model_provider_name
116        )
117        if not self._model_provider:
118            raise ValueError(
119                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
120            )
121        return self._model_provider
122
123    async def invoke(
124        self,
125        input: InputType,
126        input_source: DataSource | None = None,
127    ) -> TaskRun:
128        run_output, _ = await self.invoke_returning_run_output(input, input_source)
129        return run_output
130
131    async def _run_returning_run_output(
132        self,
133        input: InputType,
134        input_source: DataSource | None = None,
135    ) -> Tuple[TaskRun, RunOutput]:
136        # validate input, allowing arrays
137        if self.input_schema is not None:
138            validate_schema_with_value_error(
139                input,
140                self.input_schema,
141                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
142                require_object=False,
143            )
144
145        # Format model input for model call (we save the original input in the task without formatting)
146        formatted_input = input
147        formatter_id = self.model_provider().formatter
148        if formatter_id is not None:
149            formatter = request_formatter_from_id(formatter_id)
150            formatted_input = formatter.format_input(input)
151
152        # Run
153        run_output, usage = await self._run(formatted_input)
154
155        # Parse
156        provider = self.model_provider()
157        parser = model_parser_from_id(provider.parser)
158        parsed_output = parser.parse_output(original_output=run_output)
159
160        # validate output
161        if self.output_schema is not None:
162            # Parse json to dict if we have structured output
163            if isinstance(parsed_output.output, str):
164                parsed_output.output = parse_json_string(parsed_output.output)
165
166            if not isinstance(parsed_output.output, dict):
167                raise RuntimeError(
168                    f"structured response is not a dict: {parsed_output.output}"
169                )
170            validate_schema_with_value_error(
171                parsed_output.output,
172                self.output_schema,
173                "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
174            )
175        else:
176            if not isinstance(parsed_output.output, str):
177                raise RuntimeError(
178                    f"response is not a string for non-structured task: {parsed_output.output}"
179                )
180
181        # Validate reasoning content is present and required
182        # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
183        trace_has_toolcalls = parsed_output.trace is not None and any(
184            message.get("role", None) == "tool" for message in parsed_output.trace
185        )
186        if (
187            provider.reasoning_capable
188            and (
189                not parsed_output.intermediate_outputs
190                or "reasoning" not in parsed_output.intermediate_outputs
191            )
192            and not (
193                provider.reasoning_optional_for_structured_output
194                and self.has_structured_output()
195            )
196            and not (trace_has_toolcalls)
197        ):
198            raise RuntimeError(
199                "Reasoning is required for this model, but no reasoning was returned."
200            )
201
202        # Generate the run and output
203        run = self.generate_run(
204            input, input_source, parsed_output, usage, run_output.trace
205        )
206
207        # Save the run if configured to do so, and we have a path to save to
208        if (
209            self.base_adapter_config.allow_saving
210            and Config.shared().autosave_runs
211            and self.task.path is not None
212        ):
213            run.save_to_file()
214        else:
215            # Clear the ID to indicate it's not persisted
216            run.id = None
217
218        return run, run_output
219
220    async def invoke_returning_run_output(
221        self,
222        input: InputType,
223        input_source: DataSource | None = None,
224    ) -> Tuple[TaskRun, RunOutput]:
225        # Determine if this is the root agent (no existing run context)
226        is_root_agent = get_agent_run_id() is None
227
228        if is_root_agent:
229            run_id = generate_agent_run_id()
230            set_agent_run_id(run_id)
231
232        try:
233            return await self._run_returning_run_output(input, input_source)
234        finally:
235            if is_root_agent:
236                try:
237                    run_id = get_agent_run_id()
238                    if run_id:
239                        await MCPSessionManager.shared().cleanup_session(run_id)
240                finally:
241                    clear_agent_run_id()
242
243    def has_structured_output(self) -> bool:
244        return self.output_schema is not None
245
246    @abstractmethod
247    def adapter_name(self) -> str:
248        pass
249
250    @abstractmethod
251    async def _run(self, input: InputType) -> Tuple[RunOutput, Usage | None]:
252        pass
253
254    def build_prompt(self) -> str:
255        if self.prompt_builder is None:
256            raise ValueError("Prompt builder is not available for MCP run config")
257        # The prompt builder needs to know if we want to inject formatting instructions
258        structured_output_mode = as_kiln_agent_run_config(
259            self.run_config
260        ).structured_output_mode
261        add_json_instructions = self.has_structured_output() and (
262            structured_output_mode == StructuredOutputMode.json_instructions
263            or structured_output_mode
264            == StructuredOutputMode.json_instruction_and_object
265        )
266
267        return self.prompt_builder.build_prompt(
268            include_json_instructions=add_json_instructions
269        )
270
271    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
272        if self.prompt_builder is None:
273            raise ValueError("Prompt builder is not available for MCP run config")
274        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
275
276        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
277        system_message = self.build_prompt()
278
279        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
280        if not cot_prompt:
281            return get_chat_formatter(
282                strategy=ChatStrategy.single_turn,
283                system_message=system_message,
284                user_input=input,
285            )
286
287        # Some models like finetunes are trained with a specific chat strategy. Use that.
288        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
289        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
290        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
291            return get_chat_formatter(
292                strategy=tuned_chat_strategy,
293                system_message=system_message,
294                user_input=input,
295                thinking_instructions=cot_prompt,
296            )
297
298        # Pick the best chat strategy for the model given it has a cot prompt.
299        reasoning_capable = self.model_provider().reasoning_capable
300        if reasoning_capable:
301            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
302            # A simple message with the COT prompt appended to the message list is sufficient
303            return get_chat_formatter(
304                strategy=ChatStrategy.single_turn_r1_thinking,
305                system_message=system_message,
306                user_input=input,
307                thinking_instructions=cot_prompt,
308            )
309        else:
310            # Unstructured output with COT
311            # Two calls to separate the thinking from the final response
312            return get_chat_formatter(
313                strategy=ChatStrategy.two_message_cot,
314                system_message=system_message,
315                user_input=input,
316                thinking_instructions=cot_prompt,
317            )
318
319    # create a run and task output
320    def generate_run(
321        self,
322        input: InputType,
323        input_source: DataSource | None,
324        run_output: RunOutput,
325        usage: Usage | None = None,
326        trace: list[ChatCompletionMessageParam] | None = None,
327    ) -> TaskRun:
328        # Convert input and output to JSON strings if they aren't strings
329        input_str = (
330            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
331        )
332        output_str = (
333            json.dumps(run_output.output, ensure_ascii=False)
334            if isinstance(run_output.output, dict)
335            else run_output.output
336        )
337
338        # If no input source is provided, use the human data source
339        if input_source is None:
340            input_source = DataSource(
341                type=DataSourceType.human,
342                properties={"created_by": Config.shared().user_id},
343            )
344
345        # Synthetic since an adapter, not a human, is creating this
346        # Special case for MCP run configs which calls a mcp tool
347        output_source_type = (
348            DataSourceType.tool_call
349            if self.run_config.type == "mcp"
350            else DataSourceType.synthetic
351        )
352
353        new_task_run = TaskRun(
354            parent=self.task,
355            input=input_str,
356            input_source=input_source,
357            output=TaskOutput(
358                output=output_str,
359                source=DataSource(
360                    type=output_source_type,
361                    properties=self._properties_for_task_output(),
362                    run_config=self.run_config,
363                ),
364            ),
365            intermediate_outputs=run_output.intermediate_outputs,
366            tags=self.base_adapter_config.default_tags or [],
367            usage=usage,
368            trace=trace,
369        )
370
371        return new_task_run
372
373    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
374        match self.run_config.type:
375            case "mcp":
376                return {}
377            case "kiln_agent":
378                if not isinstance(self.run_config, KilnAgentRunConfigProperties):
379                    raise ValueError("Kiln agent run config is required")
380                run_config = self.run_config
381
382                props: Dict[str, str | int | float] = {}
383                props["adapter_name"] = self.adapter_name()
384                # Legacy properties where we save the run_config details into custom properties.
385                # These are now also be saved in the run_config field.
386                props["model_name"] = run_config.model_name
387                props["model_provider"] = run_config.model_provider_name
388                props["prompt_id"] = run_config.prompt_id
389                props["structured_output_mode"] = run_config.structured_output_mode
390                props["temperature"] = run_config.temperature
391                props["top_p"] = run_config.top_p
392
393                return props
394            case _:
395                raise_exhaustive_enum_error(self.run_config.type)
396
397    def update_run_config_unknown_structured_output_mode(self) -> None:
398        if self.run_config.type != "kiln_agent":
399            return
400        run_config = as_kiln_agent_run_config(self.run_config)
401        structured_output_mode = run_config.structured_output_mode
402
403        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
404        # Look up our recommended mode from ml_model_list if we have one
405        if structured_output_mode == StructuredOutputMode.unknown:
406            new_run_config = run_config.model_copy(deep=True)
407            structured_output_mode = default_structured_output_mode_for_model_provider(
408                run_config.model_name,
409                run_config.model_provider_name,
410            )
411            new_run_config.structured_output_mode = structured_output_mode
412            self.run_config = new_run_config
413
414    async def available_tools(self) -> list[KilnToolInterface]:
415        if self.run_config.type != "kiln_agent":
416            return []
417        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
418        if tool_config is None or tool_config.tools is None:
419            return []
420
421        project = self.task.parent_project()
422        if project is None:
423            raise ValueError("Task must have a parent project to resolve tools")
424
425        project_id = project.id
426        if project_id is None:
427            raise ValueError("Project must have an ID to resolve tools")
428
429        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
430
431        # Check each tool has a unique name
432        tool_names = [await tool.name() for tool in tools]
433        if len(tool_names) != len(set(tool_names)):
434            raise ValueError(
435                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
436            )
437
438        return tools

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.

task
run_config: Annotated[Union[Annotated[kiln_ai.datamodel.run_config.KilnAgentRunConfigProperties, Tag(tag='kiln_agent')], Annotated[kiln_ai.datamodel.run_config.McpRunConfigProperties, Tag(tag='mcp')]], Discriminator(discriminator=<function _get_run_config_type at 0x7f5e59c4de40>, custom_error_type=None, custom_error_message=None, custom_error_context=None)]
base_adapter_config
output_schema
input_schema
def model_provider(self) -> kiln_ai.adapters.ml_model_list.KilnModelProvider:
105    def model_provider(self) -> KilnModelProvider:
106        """
107        Lazy load the model provider for this adapter.
108        """
109        if self._model_provider is not None:
110            return self._model_provider
111        run_config = as_kiln_agent_run_config(self.run_config)
112        if not run_config.model_name or not run_config.model_provider_name:
113            raise ValueError("model_name and model_provider_name must be provided")
114        self._model_provider = kiln_model_provider_from(
115            run_config.model_name, run_config.model_provider_name
116        )
117        if not self._model_provider:
118            raise ValueError(
119                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
120            )
121        return self._model_provider

Lazy load the model provider for this adapter.

async def invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
123    async def invoke(
124        self,
125        input: InputType,
126        input_source: DataSource | None = None,
127    ) -> TaskRun:
128        run_output, _ = await self.invoke_returning_run_output(input, input_source)
129        return run_output
async def invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
220    async def invoke_returning_run_output(
221        self,
222        input: InputType,
223        input_source: DataSource | None = None,
224    ) -> Tuple[TaskRun, RunOutput]:
225        # Determine if this is the root agent (no existing run context)
226        is_root_agent = get_agent_run_id() is None
227
228        if is_root_agent:
229            run_id = generate_agent_run_id()
230            set_agent_run_id(run_id)
231
232        try:
233            return await self._run_returning_run_output(input, input_source)
234        finally:
235            if is_root_agent:
236                try:
237                    run_id = get_agent_run_id()
238                    if run_id:
239                        await MCPSessionManager.shared().cleanup_session(run_id)
240                finally:
241                    clear_agent_run_id()
def has_structured_output(self) -> bool:
243    def has_structured_output(self) -> bool:
244        return self.output_schema is not None
@abstractmethod
def adapter_name(self) -> str:
246    @abstractmethod
247    def adapter_name(self) -> str:
248        pass
def build_prompt(self) -> str:
254    def build_prompt(self) -> str:
255        if self.prompt_builder is None:
256            raise ValueError("Prompt builder is not available for MCP run config")
257        # The prompt builder needs to know if we want to inject formatting instructions
258        structured_output_mode = as_kiln_agent_run_config(
259            self.run_config
260        ).structured_output_mode
261        add_json_instructions = self.has_structured_output() and (
262            structured_output_mode == StructuredOutputMode.json_instructions
263            or structured_output_mode
264            == StructuredOutputMode.json_instruction_and_object
265        )
266
267        return self.prompt_builder.build_prompt(
268            include_json_instructions=add_json_instructions
269        )
def build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str]) -> kiln_ai.adapters.chat.ChatFormatter:
271    def build_chat_formatter(self, input: InputType) -> ChatFormatter:
272        if self.prompt_builder is None:
273            raise ValueError("Prompt builder is not available for MCP run config")
274        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
275
276        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
277        system_message = self.build_prompt()
278
279        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
280        if not cot_prompt:
281            return get_chat_formatter(
282                strategy=ChatStrategy.single_turn,
283                system_message=system_message,
284                user_input=input,
285            )
286
287        # Some models like finetunes are trained with a specific chat strategy. Use that.
288        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
289        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
290        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
291            return get_chat_formatter(
292                strategy=tuned_chat_strategy,
293                system_message=system_message,
294                user_input=input,
295                thinking_instructions=cot_prompt,
296            )
297
298        # Pick the best chat strategy for the model given it has a cot prompt.
299        reasoning_capable = self.model_provider().reasoning_capable
300        if reasoning_capable:
301            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
302            # A simple message with the COT prompt appended to the message list is sufficient
303            return get_chat_formatter(
304                strategy=ChatStrategy.single_turn_r1_thinking,
305                system_message=system_message,
306                user_input=input,
307                thinking_instructions=cot_prompt,
308            )
309        else:
310            # Unstructured output with COT
311            # Two calls to separate the thinking from the final response
312            return get_chat_formatter(
313                strategy=ChatStrategy.two_message_cot,
314                system_message=system_message,
315                user_input=input,
316                thinking_instructions=cot_prompt,
317            )
def generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.datamodel.TaskRun:
320    def generate_run(
321        self,
322        input: InputType,
323        input_source: DataSource | None,
324        run_output: RunOutput,
325        usage: Usage | None = None,
326        trace: list[ChatCompletionMessageParam] | None = None,
327    ) -> TaskRun:
328        # Convert input and output to JSON strings if they aren't strings
329        input_str = (
330            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
331        )
332        output_str = (
333            json.dumps(run_output.output, ensure_ascii=False)
334            if isinstance(run_output.output, dict)
335            else run_output.output
336        )
337
338        # If no input source is provided, use the human data source
339        if input_source is None:
340            input_source = DataSource(
341                type=DataSourceType.human,
342                properties={"created_by": Config.shared().user_id},
343            )
344
345        # Synthetic since an adapter, not a human, is creating this
346        # Special case for MCP run configs which calls a mcp tool
347        output_source_type = (
348            DataSourceType.tool_call
349            if self.run_config.type == "mcp"
350            else DataSourceType.synthetic
351        )
352
353        new_task_run = TaskRun(
354            parent=self.task,
355            input=input_str,
356            input_source=input_source,
357            output=TaskOutput(
358                output=output_str,
359                source=DataSource(
360                    type=output_source_type,
361                    properties=self._properties_for_task_output(),
362                    run_config=self.run_config,
363                ),
364            ),
365            intermediate_outputs=run_output.intermediate_outputs,
366            tags=self.base_adapter_config.default_tags or [],
367            usage=usage,
368            trace=trace,
369        )
370
371        return new_task_run
def update_run_config_unknown_structured_output_mode(self) -> None:
397    def update_run_config_unknown_structured_output_mode(self) -> None:
398        if self.run_config.type != "kiln_agent":
399            return
400        run_config = as_kiln_agent_run_config(self.run_config)
401        structured_output_mode = run_config.structured_output_mode
402
403        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
404        # Look up our recommended mode from ml_model_list if we have one
405        if structured_output_mode == StructuredOutputMode.unknown:
406            new_run_config = run_config.model_copy(deep=True)
407            structured_output_mode = default_structured_output_mode_for_model_provider(
408                run_config.model_name,
409                run_config.model_provider_name,
410            )
411            new_run_config.structured_output_mode = structured_output_mode
412            self.run_config = new_run_config
async def available_tools(self) -> list[kiln_ai.tools.KilnToolInterface]:
414    async def available_tools(self) -> list[KilnToolInterface]:
415        if self.run_config.type != "kiln_agent":
416            return []
417        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
418        if tool_config is None or tool_config.tools is None:
419            return []
420
421        project = self.task.parent_project()
422        if project is None:
423            raise ValueError("Task must have a parent project to resolve tools")
424
425        project_id = project.id
426        if project_id is None:
427            raise ValueError("Project must have an ID to resolve tools")
428
429        tools = [tool_from_id(tool_id, self.task) for tool_id in tool_config.tools]
430
431        # Check each tool has a unique name
432        tool_names = [await tool.name() for tool in tools]
433        if len(tool_names) != len(set(tool_names)):
434            raise ValueError(
435                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
436            )
437
438        return tools