kiln_ai.adapters.model_adapters.base_adapter

  1from __future__ import annotations
  2
  3import json
  4import uuid
  5from abc import ABCMeta, abstractmethod
  6from dataclasses import dataclass
  7from typing import TYPE_CHECKING, AsyncIterator, Dict, Tuple
  8
  9from litellm.types.utils import ModelResponseStream
 10
 11from kiln_ai.adapters.chat.chat_formatter import (
 12    ChatFormatter,
 13    MultiturnFormatter,
 14    get_chat_formatter,
 15)
 16from kiln_ai.adapters.ml_model_list import (
 17    KilnModelProvider,
 18    StructuredOutputMode,
 19    default_structured_output_mode_for_model_provider,
 20)
 21from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStreamResult
 22from kiln_ai.adapters.model_adapters.stream_events import (
 23    AiSdkStreamConverter,
 24    AiSdkStreamEvent,
 25    FinishEvent,
 26    FinishMessageMetadata,
 27    FinishStepEvent,
 28    StartEvent,
 29    StartStepEvent,
 30    ToolCallEvent,
 31)
 32from kiln_ai.adapters.parsers.json_parser import parse_json_string
 33from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id
 34from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id
 35from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id
 36from kiln_ai.adapters.provider_tools import kiln_model_provider_from
 37from kiln_ai.adapters.run_output import RunOutput
 38from kiln_ai.datamodel import (
 39    DataSource,
 40    DataSourceType,
 41    MessageUsage,
 42    Task,
 43    TaskOutput,
 44    TaskRun,
 45    Usage,
 46)
 47from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
 48from kiln_ai.datamodel.json_schema import validate_schema_with_value_error
 49from kiln_ai.datamodel.run_config import (
 50    KilnAgentRunConfigProperties,
 51    as_kiln_agent_run_config,
 52)
 53from kiln_ai.datamodel.skill import Skill
 54from kiln_ai.datamodel.task import RunConfigProperties
 55from kiln_ai.datamodel.tool_id import SKILL_TOOL_ID_PREFIX, skill_id_from_tool_id
 56
 57# Import agent run context for run lifecycle management
 58from kiln_ai.run_context import (
 59    clear_agent_run_id,
 60    generate_agent_run_id,
 61    get_agent_run_id,
 62    set_agent_run_id,
 63)
 64from kiln_ai.tools import KilnToolInterface
 65from kiln_ai.tools.mcp_session_manager import MCPSessionManager
 66from kiln_ai.tools.skill_tool import SkillTool
 67from kiln_ai.tools.tool_registry import tool_from_id
 68from kiln_ai.utils.config import Config
 69from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
 70from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
 71
 72if TYPE_CHECKING:
 73    from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream
 74
 75SkillsDict = Dict[str, Skill]
 76
 77
 78@dataclass
 79class AdapterConfig:
 80    """
 81    An adapter config is config options that do NOT impact the output of the model.
 82
 83    For example: if it's saved, of if we request additional data like logprobs.
 84    """
 85
 86    allow_saving: bool = True
 87    top_logprobs: int | None = None
 88    default_tags: list[str] | None = None
 89
 90    """
 91    A custom prompt builder can be injected to override the system prompt building process.
 92    If not provided, the prompt builder will be created from the run_config.prompt_id which
 93    may load additional files from disk.
 94    """
 95    prompt_builder: BasePromptBuilder | None = None
 96
 97    """
 98    Pre-loaded skills keyed by skill ID. When the run config references skills,
 99    they are looked up from this dict instead of reading from the filesystem.
100    Use load_skills_for_task() to build this dict.
101    """
102    skills: SkillsDict | None = None
103
104    """
105    When True, the adapter will stop and return control to the caller when a tool call
106    is invoked, instead of processing tool calls internally. Default is False (process
107    tool calls internally).
108    """
109    return_on_tool_call: bool = False
110
111    """
112    Extra tools provided directly by the caller, in addition to tools resolved from the
113    task's tool registry. These are sent to the model together with registry tools, and
114    their names must not collide with registry tool names.
115
116    If ``return_on_tool_call`` is False (the default), the adapter executes these tools
117    itself just like registry tools. If True, the adapter returns as soon as the model
118    requests a tool call and the caller is responsible for running the tool and passing
119    results back via ``prior_trace``.
120    """
121    unmanaged_tools: list[KilnToolInterface] | None = None
122
123    """
124    When True, automatically inject prompt caching hints into completion
125    requests. This is a cost optimization and does not affect model output.
126    """
127    automatic_prompt_caching: bool = False
128
129
130class BaseAdapter(metaclass=ABCMeta):
131    """Base class for AI model adapters that handle task execution.
132
133    This abstract class provides the foundation for implementing model-specific adapters
134    that can process tasks with structured or unstructured inputs/outputs. It handles
135    input/output validation, prompt building, and run tracking.
136
137    Prompt building is handled internally by the adapter, which uses a prompt builder
138    based on the run config. To override the prompt building behavior, pass a custom prompt
139    builder to the adapter config.
140    """
141
142    def __init__(
143        self,
144        task: Task,
145        run_config: RunConfigProperties,
146        config: AdapterConfig | None = None,
147    ):
148        self.task = task
149        self.run_config: RunConfigProperties = run_config
150        self.base_adapter_config = config or AdapterConfig()
151
152        if isinstance(run_config, KilnAgentRunConfigProperties):
153            self.update_run_config_unknown_structured_output_mode()
154            self.prompt_builder = (
155                self.base_adapter_config.prompt_builder
156                or prompt_builder_from_id(run_config.prompt_id, task)
157            )
158        else:
159            self.prompt_builder = None
160        self._model_provider: KilnModelProvider | None = None
161        self._resolved_skills: list[Skill] | None = None
162
163        self.output_schema = task.output_json_schema
164        self.input_schema = task.input_json_schema
165
166    def model_provider(self) -> KilnModelProvider:
167        """
168        Lazy load the model provider for this adapter.
169        """
170        if self._model_provider is not None:
171            return self._model_provider
172        run_config = as_kiln_agent_run_config(self.run_config)
173        if not run_config.model_name or not run_config.model_provider_name:
174            raise ValueError("model_name and model_provider_name must be provided")
175        self._model_provider = kiln_model_provider_from(
176            run_config.model_name, run_config.model_provider_name
177        )
178        if not self._model_provider:
179            raise ValueError(
180                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
181            )
182        return self._model_provider
183
184    @staticmethod
185    def _normalize_prior_trace(
186        prior_trace: list[ChatCompletionMessageParam] | None,
187    ) -> list[ChatCompletionMessageParam] | None:
188        if not prior_trace:
189            return None
190        return prior_trace
191
192    def _reject_multiturn_with_structured_input(
193        self,
194        prior_trace: list[ChatCompletionMessageParam] | None,
195    ) -> None:
196        if prior_trace is not None and self.input_schema is not None:
197            raise ValueError(
198                "Cannot run multiturn execution with a task that has a structured input schema. "
199                "Use an unstructured task, or call without prior_trace."
200            )
201
202    async def invoke(
203        self,
204        input: InputType,
205        input_source: DataSource | None = None,
206        prior_trace: list[ChatCompletionMessageParam] | None = None,
207        parent_task_run: TaskRun | None = None,
208    ) -> TaskRun:
209        task_run, _ = await self.invoke_returning_run_output(
210            input, input_source, prior_trace, parent_task_run
211        )
212        return task_run
213
214    async def _run_returning_run_output(
215        self,
216        input: InputType,
217        input_source: DataSource | None = None,
218        prior_trace: list[ChatCompletionMessageParam] | None = None,
219        parent_task_run: TaskRun | None = None,
220    ) -> Tuple[TaskRun, RunOutput]:
221        prior_trace = self._normalize_prior_trace(prior_trace)
222        self._reject_multiturn_with_structured_input(prior_trace)
223
224        # validate input, allowing arrays
225        if self.input_schema is not None:
226            validate_schema_with_value_error(
227                input,
228                self.input_schema,
229                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
230                require_object=False,
231            )
232
233        # Format model input for model call (we save the original input in the task without formatting)
234        formatted_input = input
235        formatter_id = self.model_provider().formatter
236        if formatter_id is not None:
237            formatter = request_formatter_from_id(formatter_id)
238            formatted_input = formatter.format_input(input)
239
240        # Run
241        run_output, usage = await self._run(formatted_input, prior_trace=prior_trace)
242
243        if not run_output.is_toolcall_pending:
244            # Normal completion: parse and validate output
245            provider = self.model_provider()
246            parser = model_parser_from_id(provider.parser)
247            parsed_output = parser.parse_output(original_output=run_output)
248
249            # validate output
250            if self.output_schema is not None:
251                # Parse json to dict if we have structured output
252                if isinstance(parsed_output.output, str):
253                    parsed_output.output = parse_json_string(parsed_output.output)
254
255                if not isinstance(parsed_output.output, dict):
256                    raise RuntimeError(
257                        f"structured response is not a dict: {parsed_output.output}"
258                    )
259                validate_schema_with_value_error(
260                    parsed_output.output,
261                    self.output_schema,
262                    "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
263                )
264            else:
265                if not isinstance(parsed_output.output, str):
266                    raise RuntimeError(
267                        f"response is not a string for non-structured task: {parsed_output.output}"
268                    )
269
270            trace_has_toolcalls = parsed_output.trace is not None and any(
271                message.get("role", None) == "tool" for message in parsed_output.trace
272            )
273
274            # Validate reasoning content is present and required
275            # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
276            if (
277                provider.reasoning_capable
278                and (
279                    not parsed_output.intermediate_outputs
280                    or "reasoning" not in parsed_output.intermediate_outputs
281                )
282                and not (
283                    provider.reasoning_optional_for_structured_output
284                    and self.has_structured_output()
285                )
286                and not trace_has_toolcalls
287            ):
288                raise RuntimeError(
289                    "Reasoning is required for this model, but no reasoning was returned."
290                )
291
292            run_output = parsed_output
293
294        run = self.generate_run(
295            input, input_source, run_output, usage, run_output.trace, parent_task_run
296        )
297
298        # Save the run if configured to do so, and we have a path to save to
299        if (
300            self.base_adapter_config.allow_saving
301            and Config.shared().autosave_runs
302            and self.task.path is not None
303        ):
304            run.save_to_file()
305        else:
306            # Clear the ID to indicate it's not persisted
307            run.id = None
308
309        return run, run_output
310
311    async def invoke_returning_run_output(
312        self,
313        input: InputType,
314        input_source: DataSource | None = None,
315        prior_trace: list[ChatCompletionMessageParam] | None = None,
316        parent_task_run: TaskRun | None = None,
317    ) -> Tuple[TaskRun, RunOutput]:
318        # Determine if this is the root agent (no existing run context)
319        is_root_agent = get_agent_run_id() is None
320
321        if is_root_agent:
322            run_id = generate_agent_run_id()
323            set_agent_run_id(run_id)
324
325        try:
326            return await self._run_returning_run_output(
327                input, input_source, prior_trace, parent_task_run
328            )
329        finally:
330            if is_root_agent:
331                try:
332                    run_id = get_agent_run_id()
333                    if run_id:
334                        await MCPSessionManager.shared().cleanup_session(run_id)
335                finally:
336                    clear_agent_run_id()
337
338    def invoke_openai_stream(
339        self,
340        input: InputType,
341        input_source: DataSource | None = None,
342        prior_trace: list[ChatCompletionMessageParam] | None = None,
343        parent_task_run: TaskRun | None = None,
344    ) -> OpenAIStreamResult:
345        """Stream raw OpenAI-protocol chunks for the task execution.
346
347        Returns an async-iterable that yields ``ModelResponseStream`` chunks
348        as they arrive from the model.  After the iterator is exhausted the
349        run has been validated and saved (when configured).  The resulting
350        ``TaskRun`` is available via the ``.task_run`` property.
351
352        Tool-call rounds happen internally and are not surfaced; use
353        ``invoke_ai_sdk_stream`` if you need tool-call events.
354        """
355        return OpenAIStreamResult(
356            self, input, input_source, prior_trace, parent_task_run
357        )
358
359    def invoke_ai_sdk_stream(
360        self,
361        input: InputType,
362        input_source: DataSource | None = None,
363        prior_trace: list[ChatCompletionMessageParam] | None = None,
364        parent_task_run: TaskRun | None = None,
365    ) -> AiSdkStreamResult:
366        """Stream AI SDK protocol events for the task execution.
367
368        Returns an async-iterable that yields ``AiSdkStreamEvent`` instances
369        covering text, reasoning, tool-call lifecycle, step boundaries, and
370        control events.  After the iterator is exhausted the resulting
371        ``TaskRun`` is available via the ``.task_run`` property.
372        """
373        return AiSdkStreamResult(
374            self, input, input_source, prior_trace, parent_task_run
375        )
376
377    def _prepare_stream(
378        self,
379        input: InputType,
380        prior_trace: list[ChatCompletionMessageParam] | None,
381    ) -> AdapterStream:
382        prior_trace = self._normalize_prior_trace(prior_trace)
383        self._reject_multiturn_with_structured_input(prior_trace)
384
385        if self.input_schema is not None:
386            validate_schema_with_value_error(
387                input,
388                self.input_schema,
389                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
390                require_object=False,
391            )
392
393        formatted_input = input
394        formatter_id = self.model_provider().formatter
395        if formatter_id is not None:
396            formatter = request_formatter_from_id(formatter_id)
397            formatted_input = formatter.format_input(input)
398
399        return self._create_run_stream(formatted_input, prior_trace)
400
401    def _finalize_stream(
402        self,
403        adapter_stream: AdapterStream,
404        input: InputType,
405        input_source: DataSource | None,
406        parent_task_run: TaskRun | None = None,
407    ) -> TaskRun:
408        """Streaming invocations are only concerned with passing through events as they come in.
409        At the end of the stream, we still need to validate the output, create a run and everything
410        else that a non-streaming invocation would do.
411        """
412
413        result: AdapterStreamResult = adapter_stream.result
414        run_output = result.run_output
415        usage = result.usage
416
417        if not run_output.is_toolcall_pending:
418            # Normal completion: parse and validate output
419            provider = self.model_provider()
420            parser = model_parser_from_id(provider.parser)
421            parsed_output = parser.parse_output(original_output=run_output)
422
423            if self.output_schema is not None:
424                if isinstance(parsed_output.output, str):
425                    parsed_output.output = parse_json_string(parsed_output.output)
426                if not isinstance(parsed_output.output, dict):
427                    raise RuntimeError(
428                        f"structured response is not a dict: {parsed_output.output}"
429                    )
430                validate_schema_with_value_error(
431                    parsed_output.output,
432                    self.output_schema,
433                    "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
434                )
435            else:
436                if not isinstance(parsed_output.output, str):
437                    raise RuntimeError(
438                        f"response is not a string for non-structured task: {parsed_output.output}"
439                    )
440
441            trace_has_toolcalls = parsed_output.trace is not None and any(
442                message.get("role", None) == "tool" for message in parsed_output.trace
443            )
444            if (
445                provider.reasoning_capable
446                and (
447                    not parsed_output.intermediate_outputs
448                    or "reasoning" not in parsed_output.intermediate_outputs
449                )
450                and not (
451                    provider.reasoning_optional_for_structured_output
452                    and self.has_structured_output()
453                )
454                and not trace_has_toolcalls
455            ):
456                raise RuntimeError(
457                    "Reasoning is required for this model, but no reasoning was returned."
458                )
459
460            run_output = parsed_output
461
462        run = self.generate_run(
463            input, input_source, run_output, usage, run_output.trace, parent_task_run
464        )
465
466        if (
467            self.base_adapter_config.allow_saving
468            and Config.shared().autosave_runs
469            and self.task.path is not None
470        ):
471            run.save_to_file()
472        else:
473            run.id = None
474
475        return run
476
477    def has_structured_output(self) -> bool:
478        return self.output_schema is not None
479
480    @abstractmethod
481    def adapter_name(self) -> str:
482        pass
483
484    @abstractmethod
485    async def _run(
486        self,
487        input: InputType,
488        prior_trace: list[ChatCompletionMessageParam] | None = None,
489    ) -> Tuple[RunOutput, Usage | None]:
490        pass
491
492    def _create_run_stream(
493        self,
494        input: InputType,
495        prior_trace: list[ChatCompletionMessageParam] | None = None,
496    ) -> AdapterStream:
497        """Create a stream for the adapter. Implementations must override this method to support streaming."""
498        raise NotImplementedError("Streaming is not supported for this adapter type")
499
500    def build_prompt(self) -> str:
501        if self.prompt_builder is None:
502            raise ValueError("Prompt builder is not available for MCP run config")
503        # The prompt builder needs to know if we want to inject formatting instructions
504        structured_output_mode = as_kiln_agent_run_config(
505            self.run_config
506        ).structured_output_mode
507        add_json_instructions = self.has_structured_output() and (
508            structured_output_mode == StructuredOutputMode.json_instructions
509            or structured_output_mode
510            == StructuredOutputMode.json_instruction_and_object
511        )
512
513        return self.prompt_builder.build_prompt(
514            include_json_instructions=add_json_instructions,
515            skills=self._resolve_skills(),
516        )
517
518    def _resolve_skills(self) -> list[Skill]:
519        """Resolve skills from the injected skills dict.
520
521        Uses the pre-loaded skills dict from AdapterConfig. Caches the result
522        so that build_prompt and available_tools don't repeat
523        the lookup. Raises ValueError if the run config references a skill
524        that is not in the injected dict.
525        """
526        if self._resolved_skills is not None:
527            return self._resolved_skills
528
529        if self.run_config.type != "kiln_agent":
530            self._resolved_skills = []
531            return self._resolved_skills
532
533        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
534        if tool_config is None or tool_config.tools is None:
535            self._resolved_skills = []
536            return self._resolved_skills
537
538        skill_tool_ids = [
539            tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX)
540        ]
541        if not skill_tool_ids:
542            self._resolved_skills = []
543            return self._resolved_skills
544
545        injected = self.base_adapter_config.skills
546        if injected is None:
547            raise ValueError(
548                "Run config references skills but no skills dict was provided via "
549                "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load "
550                "skills and pass them to the adapter."
551            )
552
553        skills: list[Skill] = []
554        seen: set[str] = set()
555        for tool_id in skill_tool_ids:
556            sid = skill_id_from_tool_id(tool_id)
557            if sid not in injected:
558                raise ValueError(
559                    f"Skill {sid} referenced in run config but not found in the "
560                    "injected skills dict."
561                )
562            if sid in seen:
563                continue
564            seen.add(sid)
565            skills.append(injected[sid])
566
567        self._resolved_skills = skills
568        return self._resolved_skills
569
570    def build_chat_formatter(
571        self,
572        input: InputType,
573        prior_trace: list[ChatCompletionMessageParam] | None = None,
574    ) -> ChatFormatter:
575        prior_trace = self._normalize_prior_trace(prior_trace)
576        self._reject_multiturn_with_structured_input(prior_trace)
577        if prior_trace is not None:
578            return MultiturnFormatter(prior_trace, input)
579        if self.prompt_builder is None:
580            raise ValueError("Prompt builder is not available for MCP run config")
581        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
582
583        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
584        system_message = self.build_prompt()
585
586        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
587        if not cot_prompt:
588            return get_chat_formatter(
589                strategy=ChatStrategy.single_turn,
590                system_message=system_message,
591                user_input=input,
592            )
593
594        # Some models like finetunes are trained with a specific chat strategy. Use that.
595        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
596        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
597        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
598            return get_chat_formatter(
599                strategy=tuned_chat_strategy,
600                system_message=system_message,
601                user_input=input,
602                thinking_instructions=cot_prompt,
603            )
604
605        # Pick the best chat strategy for the model given it has a cot prompt.
606        reasoning_capable = self.model_provider().reasoning_capable
607        if reasoning_capable:
608            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
609            # A simple message with the COT prompt appended to the message list is sufficient
610            return get_chat_formatter(
611                strategy=ChatStrategy.single_turn_r1_thinking,
612                system_message=system_message,
613                user_input=input,
614                thinking_instructions=cot_prompt,
615            )
616        else:
617            # Unstructured output with COT
618            # Two calls to separate the thinking from the final response
619            return get_chat_formatter(
620                strategy=ChatStrategy.two_message_cot,
621                system_message=system_message,
622                user_input=input,
623                thinking_instructions=cot_prompt,
624            )
625
626    # create a run and task output
627    def generate_run(
628        self,
629        input: InputType,
630        input_source: DataSource | None,
631        run_output: RunOutput,
632        usage: Usage | None = None,
633        trace: list[ChatCompletionMessageParam] | None = None,
634        parent_task_run: TaskRun | None = None,
635    ) -> TaskRun:
636        output_str = (
637            json.dumps(run_output.output, ensure_ascii=False)
638            if isinstance(run_output.output, dict)
639            else run_output.output
640        )
641
642        output_source_type = (
643            DataSourceType.tool_call
644            if self.run_config.type == "mcp"
645            else DataSourceType.synthetic
646        )
647
648        new_output = TaskOutput(
649            output=output_str,
650            source=DataSource(
651                type=output_source_type,
652                properties=self._properties_for_task_output(),
653                run_config=self.run_config,
654            ),
655        )
656
657        # Convert input and output to JSON strings if they aren't strings
658        input_str = (
659            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
660        )
661
662        if input_source is None:
663            input_source = DataSource(
664                type=DataSourceType.human,
665                properties={"created_by": Config.shared().user_id},
666            )
667
668        parent_task_run_id: str | None = None
669        if parent_task_run is not None:
670            if parent_task_run.id is None:
671                raise ValueError(
672                    "parent_task_run must be persisted before using as parent: save the parent "
673                    "TaskRun (e.g. save_to_file()) so it has a stable id."
674                )
675            parent_task_run_id = parent_task_run.id
676
677        return TaskRun(
678            parent=self.task,
679            parent_task_run_id=parent_task_run_id,
680            input=input_str,
681            input_source=input_source,
682            output=new_output,
683            intermediate_outputs=run_output.intermediate_outputs,
684            tags=self.base_adapter_config.default_tags or [],
685            usage=usage,
686            trace=trace,
687            cumulative_usage=MessageUsage.from_trace(trace),
688        )
689
690    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
691        match self.run_config.type:
692            case "mcp":
693                return {}
694            case "kiln_agent":
695                if not isinstance(self.run_config, KilnAgentRunConfigProperties):
696                    raise ValueError("Kiln agent run config is required")
697                run_config = self.run_config
698
699                props: Dict[str, str | int | float] = {}
700                props["adapter_name"] = self.adapter_name()
701                # Legacy properties where we save the run_config details into custom properties.
702                # These are now also be saved in the run_config field.
703                props["model_name"] = run_config.model_name
704                props["model_provider"] = run_config.model_provider_name
705                props["prompt_id"] = run_config.prompt_id
706                props["structured_output_mode"] = run_config.structured_output_mode
707                props["temperature"] = run_config.temperature
708                props["top_p"] = run_config.top_p
709
710                return props
711            case _:
712                raise_exhaustive_enum_error(self.run_config.type)
713
714    def update_run_config_unknown_structured_output_mode(self) -> None:
715        if self.run_config.type != "kiln_agent":
716            return
717        run_config = as_kiln_agent_run_config(self.run_config)
718        structured_output_mode = run_config.structured_output_mode
719
720        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
721        # Look up our recommended mode from ml_model_list if we have one
722        if structured_output_mode == StructuredOutputMode.unknown:
723            new_run_config = run_config.model_copy(deep=True)
724            structured_output_mode = default_structured_output_mode_for_model_provider(
725                run_config.model_name,
726                run_config.model_provider_name,
727            )
728            new_run_config.structured_output_mode = structured_output_mode
729            self.run_config = new_run_config
730
731    async def available_tools(self) -> list[KilnToolInterface]:
732        if self.run_config.type != "kiln_agent":
733            return []
734        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
735        if tool_config is None or tool_config.tools is None:
736            return []
737
738        non_skill_tool_ids = [
739            tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX)
740        ]
741
742        tools: list[KilnToolInterface] = [
743            tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids
744        ]
745
746        skills = self._resolve_skills()
747        if skills:
748            seen_names: set[str] = set()
749            for skill in skills:
750                if skill.name in seen_names:
751                    raise ValueError(
752                        f"Duplicate skill name '{skill.name}'. Each skill must have a unique name."
753                    )
754                seen_names.add(skill.name)
755            tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills))
756
757        tool_names = [await tool.name() for tool in tools]
758        if len(tool_names) != len(set(tool_names)):
759            raise ValueError(
760                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
761            )
762
763        return tools
764
765
766class OpenAIStreamResult:
767    """Async-iterable wrapper around the OpenAI streaming flow.
768
769    Yields ``ModelResponseStream`` chunks.  After iteration the resulting
770    ``TaskRun`` is available via the ``.task_run`` property.
771
772    When return_on_tool_call=True and the model requests tool calls, the stream
773    will stop and ``task_run.is_toolcall_pending`` will be True.
774    """
775
776    def __init__(
777        self,
778        adapter: BaseAdapter,
779        input: InputType,
780        input_source: DataSource | None,
781        prior_trace: list[ChatCompletionMessageParam] | None,
782        parent_task_run: TaskRun | None = None,
783    ) -> None:
784        self._adapter = adapter
785        self._input = input
786        self._input_source = input_source
787        self._prior_trace = prior_trace
788        self._parent_task_run = parent_task_run
789        self._task_run: TaskRun | None = None
790
791    @property
792    def task_run(self) -> TaskRun:
793        if self._task_run is None:
794            raise RuntimeError(
795                "Stream has not been fully consumed yet. "
796                "Iterate over the stream before accessing .task_run"
797            )
798        return self._task_run
799
800    async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
801        self._task_run = None
802        is_root_agent = get_agent_run_id() is None
803        if is_root_agent:
804            set_agent_run_id(generate_agent_run_id())
805
806        try:
807            adapter_stream = self._adapter._prepare_stream(
808                self._input, self._prior_trace
809            )
810
811            async for event in adapter_stream:
812                if isinstance(event, ModelResponseStream):
813                    yield event
814
815            self._task_run = self._adapter._finalize_stream(
816                adapter_stream, self._input, self._input_source, self._parent_task_run
817            )
818        finally:
819            if is_root_agent:
820                try:
821                    run_id = get_agent_run_id()
822                    if run_id:
823                        await MCPSessionManager.shared().cleanup_session(run_id)
824                finally:
825                    clear_agent_run_id()
826
827
828class AiSdkStreamResult:
829    """Async-iterable wrapper around the AI SDK streaming flow.
830
831    Yields ``AiSdkStreamEvent`` instances.  After iteration the resulting
832    ``TaskRun`` is available via the ``.task_run`` property.
833
834    When return_on_tool_call=True and the model requests tool calls, the FINISH
835    event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending``
836    will be True.
837    """
838
839    def __init__(
840        self,
841        adapter: BaseAdapter,
842        input: InputType,
843        input_source: DataSource | None,
844        prior_trace: list[ChatCompletionMessageParam] | None,
845        parent_task_run: TaskRun | None = None,
846    ) -> None:
847        self._adapter = adapter
848        self._input = input
849        self._input_source = input_source
850        self._prior_trace = prior_trace
851        self._parent_task_run = parent_task_run
852        self._task_run: TaskRun | None = None
853
854    @property
855    def task_run(self) -> TaskRun:
856        if self._task_run is None:
857            raise RuntimeError(
858                "Stream has not been fully consumed yet. "
859                "Iterate over the stream before accessing .task_run"
860            )
861        return self._task_run
862
863    async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]:
864        self._task_run = None
865        is_root_agent = get_agent_run_id() is None
866        if is_root_agent:
867            set_agent_run_id(generate_agent_run_id())
868
869        try:
870            adapter_stream = self._adapter._prepare_stream(
871                self._input, self._prior_trace
872            )
873
874            message_id = f"msg-{uuid.uuid4().hex}"
875            converter = AiSdkStreamConverter()
876
877            yield StartEvent(messageId=message_id)
878            yield StartStepEvent()
879
880            last_event_was_tool_call = False
881            async for event in adapter_stream:
882                if isinstance(event, ModelResponseStream):
883                    if last_event_was_tool_call:
884                        converter.reset_for_next_step()
885                        last_event_was_tool_call = False
886                    for ai_event in converter.convert_chunk(event):
887                        yield ai_event
888                elif isinstance(event, ToolCallEvent):
889                    last_event_was_tool_call = True
890                    for ai_event in converter.convert_tool_event(event):
891                        yield ai_event
892
893            for ai_event in converter.close_open_blocks():
894                yield ai_event
895
896            yield FinishStepEvent()
897
898            self._task_run = self._adapter._finalize_stream(
899                adapter_stream, self._input, self._input_source, self._parent_task_run
900            )
901
902            if self._task_run.is_toolcall_pending:
903                yield FinishEvent(
904                    messageMetadata=FinishMessageMetadata(finishReason="tool-calls"),
905                )
906            else:
907                for ai_event in converter.finalize():
908                    yield ai_event
909        finally:
910            if is_root_agent:
911                try:
912                    run_id = get_agent_run_id()
913                    if run_id:
914                        await MCPSessionManager.shared().cleanup_session(run_id)
915                finally:
916                    clear_agent_run_id()
SkillsDict = typing.Dict[str, kiln_ai.datamodel.Skill]
@dataclass
class AdapterConfig:
 79@dataclass
 80class AdapterConfig:
 81    """
 82    An adapter config is config options that do NOT impact the output of the model.
 83
 84    For example: if it's saved, of if we request additional data like logprobs.
 85    """
 86
 87    allow_saving: bool = True
 88    top_logprobs: int | None = None
 89    default_tags: list[str] | None = None
 90
 91    """
 92    A custom prompt builder can be injected to override the system prompt building process.
 93    If not provided, the prompt builder will be created from the run_config.prompt_id which
 94    may load additional files from disk.
 95    """
 96    prompt_builder: BasePromptBuilder | None = None
 97
 98    """
 99    Pre-loaded skills keyed by skill ID. When the run config references skills,
100    they are looked up from this dict instead of reading from the filesystem.
101    Use load_skills_for_task() to build this dict.
102    """
103    skills: SkillsDict | None = None
104
105    """
106    When True, the adapter will stop and return control to the caller when a tool call
107    is invoked, instead of processing tool calls internally. Default is False (process
108    tool calls internally).
109    """
110    return_on_tool_call: bool = False
111
112    """
113    Extra tools provided directly by the caller, in addition to tools resolved from the
114    task's tool registry. These are sent to the model together with registry tools, and
115    their names must not collide with registry tool names.
116
117    If ``return_on_tool_call`` is False (the default), the adapter executes these tools
118    itself just like registry tools. If True, the adapter returns as soon as the model
119    requests a tool call and the caller is responsible for running the tool and passing
120    results back via ``prior_trace``.
121    """
122    unmanaged_tools: list[KilnToolInterface] | None = None
123
124    """
125    When True, automatically inject prompt caching hints into completion
126    requests. This is a cost optimization and does not affect model output.
127    """
128    automatic_prompt_caching: bool = False

An adapter config is config options that do NOT impact the output of the model.

For example: if it's saved, of if we request additional data like logprobs.

AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None, prompt_builder: kiln_ai.adapters.prompt_builders.BasePromptBuilder | None = None, skills: Optional[Dict[str, kiln_ai.datamodel.Skill]] = None, return_on_tool_call: bool = False, unmanaged_tools: list[kiln_ai.tools.KilnToolInterface] | None = None, automatic_prompt_caching: bool = False)
allow_saving: bool = True
top_logprobs: int | None = None
default_tags: list[str] | None = None

A custom prompt builder can be injected to override the system prompt building process. If not provided, the prompt builder will be created from the run_config.prompt_id which may load additional files from disk.

Pre-loaded skills keyed by skill ID. When the run config references skills, they are looked up from this dict instead of reading from the filesystem. Use load_skills_for_task() to build this dict.

skills: Optional[Dict[str, kiln_ai.datamodel.Skill]] = None

When True, the adapter will stop and return control to the caller when a tool call is invoked, instead of processing tool calls internally. Default is False (process tool calls internally).

return_on_tool_call: bool = False

Extra tools provided directly by the caller, in addition to tools resolved from the task's tool registry. These are sent to the model together with registry tools, and their names must not collide with registry tool names.

If return_on_tool_call is False (the default), the adapter executes these tools itself just like registry tools. If True, the adapter returns as soon as the model requests a tool call and the caller is responsible for running the tool and passing results back via prior_trace.

unmanaged_tools: list[kiln_ai.tools.KilnToolInterface] | None = None

When True, automatically inject prompt caching hints into completion requests. This is a cost optimization and does not affect model output.

automatic_prompt_caching: bool = False
class BaseAdapter:
131class BaseAdapter(metaclass=ABCMeta):
132    """Base class for AI model adapters that handle task execution.
133
134    This abstract class provides the foundation for implementing model-specific adapters
135    that can process tasks with structured or unstructured inputs/outputs. It handles
136    input/output validation, prompt building, and run tracking.
137
138    Prompt building is handled internally by the adapter, which uses a prompt builder
139    based on the run config. To override the prompt building behavior, pass a custom prompt
140    builder to the adapter config.
141    """
142
143    def __init__(
144        self,
145        task: Task,
146        run_config: RunConfigProperties,
147        config: AdapterConfig | None = None,
148    ):
149        self.task = task
150        self.run_config: RunConfigProperties = run_config
151        self.base_adapter_config = config or AdapterConfig()
152
153        if isinstance(run_config, KilnAgentRunConfigProperties):
154            self.update_run_config_unknown_structured_output_mode()
155            self.prompt_builder = (
156                self.base_adapter_config.prompt_builder
157                or prompt_builder_from_id(run_config.prompt_id, task)
158            )
159        else:
160            self.prompt_builder = None
161        self._model_provider: KilnModelProvider | None = None
162        self._resolved_skills: list[Skill] | None = None
163
164        self.output_schema = task.output_json_schema
165        self.input_schema = task.input_json_schema
166
167    def model_provider(self) -> KilnModelProvider:
168        """
169        Lazy load the model provider for this adapter.
170        """
171        if self._model_provider is not None:
172            return self._model_provider
173        run_config = as_kiln_agent_run_config(self.run_config)
174        if not run_config.model_name or not run_config.model_provider_name:
175            raise ValueError("model_name and model_provider_name must be provided")
176        self._model_provider = kiln_model_provider_from(
177            run_config.model_name, run_config.model_provider_name
178        )
179        if not self._model_provider:
180            raise ValueError(
181                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
182            )
183        return self._model_provider
184
185    @staticmethod
186    def _normalize_prior_trace(
187        prior_trace: list[ChatCompletionMessageParam] | None,
188    ) -> list[ChatCompletionMessageParam] | None:
189        if not prior_trace:
190            return None
191        return prior_trace
192
193    def _reject_multiturn_with_structured_input(
194        self,
195        prior_trace: list[ChatCompletionMessageParam] | None,
196    ) -> None:
197        if prior_trace is not None and self.input_schema is not None:
198            raise ValueError(
199                "Cannot run multiturn execution with a task that has a structured input schema. "
200                "Use an unstructured task, or call without prior_trace."
201            )
202
203    async def invoke(
204        self,
205        input: InputType,
206        input_source: DataSource | None = None,
207        prior_trace: list[ChatCompletionMessageParam] | None = None,
208        parent_task_run: TaskRun | None = None,
209    ) -> TaskRun:
210        task_run, _ = await self.invoke_returning_run_output(
211            input, input_source, prior_trace, parent_task_run
212        )
213        return task_run
214
215    async def _run_returning_run_output(
216        self,
217        input: InputType,
218        input_source: DataSource | None = None,
219        prior_trace: list[ChatCompletionMessageParam] | None = None,
220        parent_task_run: TaskRun | None = None,
221    ) -> Tuple[TaskRun, RunOutput]:
222        prior_trace = self._normalize_prior_trace(prior_trace)
223        self._reject_multiturn_with_structured_input(prior_trace)
224
225        # validate input, allowing arrays
226        if self.input_schema is not None:
227            validate_schema_with_value_error(
228                input,
229                self.input_schema,
230                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
231                require_object=False,
232            )
233
234        # Format model input for model call (we save the original input in the task without formatting)
235        formatted_input = input
236        formatter_id = self.model_provider().formatter
237        if formatter_id is not None:
238            formatter = request_formatter_from_id(formatter_id)
239            formatted_input = formatter.format_input(input)
240
241        # Run
242        run_output, usage = await self._run(formatted_input, prior_trace=prior_trace)
243
244        if not run_output.is_toolcall_pending:
245            # Normal completion: parse and validate output
246            provider = self.model_provider()
247            parser = model_parser_from_id(provider.parser)
248            parsed_output = parser.parse_output(original_output=run_output)
249
250            # validate output
251            if self.output_schema is not None:
252                # Parse json to dict if we have structured output
253                if isinstance(parsed_output.output, str):
254                    parsed_output.output = parse_json_string(parsed_output.output)
255
256                if not isinstance(parsed_output.output, dict):
257                    raise RuntimeError(
258                        f"structured response is not a dict: {parsed_output.output}"
259                    )
260                validate_schema_with_value_error(
261                    parsed_output.output,
262                    self.output_schema,
263                    "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
264                )
265            else:
266                if not isinstance(parsed_output.output, str):
267                    raise RuntimeError(
268                        f"response is not a string for non-structured task: {parsed_output.output}"
269                    )
270
271            trace_has_toolcalls = parsed_output.trace is not None and any(
272                message.get("role", None) == "tool" for message in parsed_output.trace
273            )
274
275            # Validate reasoning content is present and required
276            # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini).
277            if (
278                provider.reasoning_capable
279                and (
280                    not parsed_output.intermediate_outputs
281                    or "reasoning" not in parsed_output.intermediate_outputs
282                )
283                and not (
284                    provider.reasoning_optional_for_structured_output
285                    and self.has_structured_output()
286                )
287                and not trace_has_toolcalls
288            ):
289                raise RuntimeError(
290                    "Reasoning is required for this model, but no reasoning was returned."
291                )
292
293            run_output = parsed_output
294
295        run = self.generate_run(
296            input, input_source, run_output, usage, run_output.trace, parent_task_run
297        )
298
299        # Save the run if configured to do so, and we have a path to save to
300        if (
301            self.base_adapter_config.allow_saving
302            and Config.shared().autosave_runs
303            and self.task.path is not None
304        ):
305            run.save_to_file()
306        else:
307            # Clear the ID to indicate it's not persisted
308            run.id = None
309
310        return run, run_output
311
312    async def invoke_returning_run_output(
313        self,
314        input: InputType,
315        input_source: DataSource | None = None,
316        prior_trace: list[ChatCompletionMessageParam] | None = None,
317        parent_task_run: TaskRun | None = None,
318    ) -> Tuple[TaskRun, RunOutput]:
319        # Determine if this is the root agent (no existing run context)
320        is_root_agent = get_agent_run_id() is None
321
322        if is_root_agent:
323            run_id = generate_agent_run_id()
324            set_agent_run_id(run_id)
325
326        try:
327            return await self._run_returning_run_output(
328                input, input_source, prior_trace, parent_task_run
329            )
330        finally:
331            if is_root_agent:
332                try:
333                    run_id = get_agent_run_id()
334                    if run_id:
335                        await MCPSessionManager.shared().cleanup_session(run_id)
336                finally:
337                    clear_agent_run_id()
338
339    def invoke_openai_stream(
340        self,
341        input: InputType,
342        input_source: DataSource | None = None,
343        prior_trace: list[ChatCompletionMessageParam] | None = None,
344        parent_task_run: TaskRun | None = None,
345    ) -> OpenAIStreamResult:
346        """Stream raw OpenAI-protocol chunks for the task execution.
347
348        Returns an async-iterable that yields ``ModelResponseStream`` chunks
349        as they arrive from the model.  After the iterator is exhausted the
350        run has been validated and saved (when configured).  The resulting
351        ``TaskRun`` is available via the ``.task_run`` property.
352
353        Tool-call rounds happen internally and are not surfaced; use
354        ``invoke_ai_sdk_stream`` if you need tool-call events.
355        """
356        return OpenAIStreamResult(
357            self, input, input_source, prior_trace, parent_task_run
358        )
359
360    def invoke_ai_sdk_stream(
361        self,
362        input: InputType,
363        input_source: DataSource | None = None,
364        prior_trace: list[ChatCompletionMessageParam] | None = None,
365        parent_task_run: TaskRun | None = None,
366    ) -> AiSdkStreamResult:
367        """Stream AI SDK protocol events for the task execution.
368
369        Returns an async-iterable that yields ``AiSdkStreamEvent`` instances
370        covering text, reasoning, tool-call lifecycle, step boundaries, and
371        control events.  After the iterator is exhausted the resulting
372        ``TaskRun`` is available via the ``.task_run`` property.
373        """
374        return AiSdkStreamResult(
375            self, input, input_source, prior_trace, parent_task_run
376        )
377
378    def _prepare_stream(
379        self,
380        input: InputType,
381        prior_trace: list[ChatCompletionMessageParam] | None,
382    ) -> AdapterStream:
383        prior_trace = self._normalize_prior_trace(prior_trace)
384        self._reject_multiturn_with_structured_input(prior_trace)
385
386        if self.input_schema is not None:
387            validate_schema_with_value_error(
388                input,
389                self.input_schema,
390                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
391                require_object=False,
392            )
393
394        formatted_input = input
395        formatter_id = self.model_provider().formatter
396        if formatter_id is not None:
397            formatter = request_formatter_from_id(formatter_id)
398            formatted_input = formatter.format_input(input)
399
400        return self._create_run_stream(formatted_input, prior_trace)
401
402    def _finalize_stream(
403        self,
404        adapter_stream: AdapterStream,
405        input: InputType,
406        input_source: DataSource | None,
407        parent_task_run: TaskRun | None = None,
408    ) -> TaskRun:
409        """Streaming invocations are only concerned with passing through events as they come in.
410        At the end of the stream, we still need to validate the output, create a run and everything
411        else that a non-streaming invocation would do.
412        """
413
414        result: AdapterStreamResult = adapter_stream.result
415        run_output = result.run_output
416        usage = result.usage
417
418        if not run_output.is_toolcall_pending:
419            # Normal completion: parse and validate output
420            provider = self.model_provider()
421            parser = model_parser_from_id(provider.parser)
422            parsed_output = parser.parse_output(original_output=run_output)
423
424            if self.output_schema is not None:
425                if isinstance(parsed_output.output, str):
426                    parsed_output.output = parse_json_string(parsed_output.output)
427                if not isinstance(parsed_output.output, dict):
428                    raise RuntimeError(
429                        f"structured response is not a dict: {parsed_output.output}"
430                    )
431                validate_schema_with_value_error(
432                    parsed_output.output,
433                    self.output_schema,
434                    "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
435                )
436            else:
437                if not isinstance(parsed_output.output, str):
438                    raise RuntimeError(
439                        f"response is not a string for non-structured task: {parsed_output.output}"
440                    )
441
442            trace_has_toolcalls = parsed_output.trace is not None and any(
443                message.get("role", None) == "tool" for message in parsed_output.trace
444            )
445            if (
446                provider.reasoning_capable
447                and (
448                    not parsed_output.intermediate_outputs
449                    or "reasoning" not in parsed_output.intermediate_outputs
450                )
451                and not (
452                    provider.reasoning_optional_for_structured_output
453                    and self.has_structured_output()
454                )
455                and not trace_has_toolcalls
456            ):
457                raise RuntimeError(
458                    "Reasoning is required for this model, but no reasoning was returned."
459                )
460
461            run_output = parsed_output
462
463        run = self.generate_run(
464            input, input_source, run_output, usage, run_output.trace, parent_task_run
465        )
466
467        if (
468            self.base_adapter_config.allow_saving
469            and Config.shared().autosave_runs
470            and self.task.path is not None
471        ):
472            run.save_to_file()
473        else:
474            run.id = None
475
476        return run
477
478    def has_structured_output(self) -> bool:
479        return self.output_schema is not None
480
481    @abstractmethod
482    def adapter_name(self) -> str:
483        pass
484
485    @abstractmethod
486    async def _run(
487        self,
488        input: InputType,
489        prior_trace: list[ChatCompletionMessageParam] | None = None,
490    ) -> Tuple[RunOutput, Usage | None]:
491        pass
492
493    def _create_run_stream(
494        self,
495        input: InputType,
496        prior_trace: list[ChatCompletionMessageParam] | None = None,
497    ) -> AdapterStream:
498        """Create a stream for the adapter. Implementations must override this method to support streaming."""
499        raise NotImplementedError("Streaming is not supported for this adapter type")
500
501    def build_prompt(self) -> str:
502        if self.prompt_builder is None:
503            raise ValueError("Prompt builder is not available for MCP run config")
504        # The prompt builder needs to know if we want to inject formatting instructions
505        structured_output_mode = as_kiln_agent_run_config(
506            self.run_config
507        ).structured_output_mode
508        add_json_instructions = self.has_structured_output() and (
509            structured_output_mode == StructuredOutputMode.json_instructions
510            or structured_output_mode
511            == StructuredOutputMode.json_instruction_and_object
512        )
513
514        return self.prompt_builder.build_prompt(
515            include_json_instructions=add_json_instructions,
516            skills=self._resolve_skills(),
517        )
518
519    def _resolve_skills(self) -> list[Skill]:
520        """Resolve skills from the injected skills dict.
521
522        Uses the pre-loaded skills dict from AdapterConfig. Caches the result
523        so that build_prompt and available_tools don't repeat
524        the lookup. Raises ValueError if the run config references a skill
525        that is not in the injected dict.
526        """
527        if self._resolved_skills is not None:
528            return self._resolved_skills
529
530        if self.run_config.type != "kiln_agent":
531            self._resolved_skills = []
532            return self._resolved_skills
533
534        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
535        if tool_config is None or tool_config.tools is None:
536            self._resolved_skills = []
537            return self._resolved_skills
538
539        skill_tool_ids = [
540            tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX)
541        ]
542        if not skill_tool_ids:
543            self._resolved_skills = []
544            return self._resolved_skills
545
546        injected = self.base_adapter_config.skills
547        if injected is None:
548            raise ValueError(
549                "Run config references skills but no skills dict was provided via "
550                "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load "
551                "skills and pass them to the adapter."
552            )
553
554        skills: list[Skill] = []
555        seen: set[str] = set()
556        for tool_id in skill_tool_ids:
557            sid = skill_id_from_tool_id(tool_id)
558            if sid not in injected:
559                raise ValueError(
560                    f"Skill {sid} referenced in run config but not found in the "
561                    "injected skills dict."
562                )
563            if sid in seen:
564                continue
565            seen.add(sid)
566            skills.append(injected[sid])
567
568        self._resolved_skills = skills
569        return self._resolved_skills
570
571    def build_chat_formatter(
572        self,
573        input: InputType,
574        prior_trace: list[ChatCompletionMessageParam] | None = None,
575    ) -> ChatFormatter:
576        prior_trace = self._normalize_prior_trace(prior_trace)
577        self._reject_multiturn_with_structured_input(prior_trace)
578        if prior_trace is not None:
579            return MultiturnFormatter(prior_trace, input)
580        if self.prompt_builder is None:
581            raise ValueError("Prompt builder is not available for MCP run config")
582        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
583
584        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
585        system_message = self.build_prompt()
586
587        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
588        if not cot_prompt:
589            return get_chat_formatter(
590                strategy=ChatStrategy.single_turn,
591                system_message=system_message,
592                user_input=input,
593            )
594
595        # Some models like finetunes are trained with a specific chat strategy. Use that.
596        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
597        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
598        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
599            return get_chat_formatter(
600                strategy=tuned_chat_strategy,
601                system_message=system_message,
602                user_input=input,
603                thinking_instructions=cot_prompt,
604            )
605
606        # Pick the best chat strategy for the model given it has a cot prompt.
607        reasoning_capable = self.model_provider().reasoning_capable
608        if reasoning_capable:
609            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
610            # A simple message with the COT prompt appended to the message list is sufficient
611            return get_chat_formatter(
612                strategy=ChatStrategy.single_turn_r1_thinking,
613                system_message=system_message,
614                user_input=input,
615                thinking_instructions=cot_prompt,
616            )
617        else:
618            # Unstructured output with COT
619            # Two calls to separate the thinking from the final response
620            return get_chat_formatter(
621                strategy=ChatStrategy.two_message_cot,
622                system_message=system_message,
623                user_input=input,
624                thinking_instructions=cot_prompt,
625            )
626
627    # create a run and task output
628    def generate_run(
629        self,
630        input: InputType,
631        input_source: DataSource | None,
632        run_output: RunOutput,
633        usage: Usage | None = None,
634        trace: list[ChatCompletionMessageParam] | None = None,
635        parent_task_run: TaskRun | None = None,
636    ) -> TaskRun:
637        output_str = (
638            json.dumps(run_output.output, ensure_ascii=False)
639            if isinstance(run_output.output, dict)
640            else run_output.output
641        )
642
643        output_source_type = (
644            DataSourceType.tool_call
645            if self.run_config.type == "mcp"
646            else DataSourceType.synthetic
647        )
648
649        new_output = TaskOutput(
650            output=output_str,
651            source=DataSource(
652                type=output_source_type,
653                properties=self._properties_for_task_output(),
654                run_config=self.run_config,
655            ),
656        )
657
658        # Convert input and output to JSON strings if they aren't strings
659        input_str = (
660            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
661        )
662
663        if input_source is None:
664            input_source = DataSource(
665                type=DataSourceType.human,
666                properties={"created_by": Config.shared().user_id},
667            )
668
669        parent_task_run_id: str | None = None
670        if parent_task_run is not None:
671            if parent_task_run.id is None:
672                raise ValueError(
673                    "parent_task_run must be persisted before using as parent: save the parent "
674                    "TaskRun (e.g. save_to_file()) so it has a stable id."
675                )
676            parent_task_run_id = parent_task_run.id
677
678        return TaskRun(
679            parent=self.task,
680            parent_task_run_id=parent_task_run_id,
681            input=input_str,
682            input_source=input_source,
683            output=new_output,
684            intermediate_outputs=run_output.intermediate_outputs,
685            tags=self.base_adapter_config.default_tags or [],
686            usage=usage,
687            trace=trace,
688            cumulative_usage=MessageUsage.from_trace(trace),
689        )
690
691    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
692        match self.run_config.type:
693            case "mcp":
694                return {}
695            case "kiln_agent":
696                if not isinstance(self.run_config, KilnAgentRunConfigProperties):
697                    raise ValueError("Kiln agent run config is required")
698                run_config = self.run_config
699
700                props: Dict[str, str | int | float] = {}
701                props["adapter_name"] = self.adapter_name()
702                # Legacy properties where we save the run_config details into custom properties.
703                # These are now also be saved in the run_config field.
704                props["model_name"] = run_config.model_name
705                props["model_provider"] = run_config.model_provider_name
706                props["prompt_id"] = run_config.prompt_id
707                props["structured_output_mode"] = run_config.structured_output_mode
708                props["temperature"] = run_config.temperature
709                props["top_p"] = run_config.top_p
710
711                return props
712            case _:
713                raise_exhaustive_enum_error(self.run_config.type)
714
715    def update_run_config_unknown_structured_output_mode(self) -> None:
716        if self.run_config.type != "kiln_agent":
717            return
718        run_config = as_kiln_agent_run_config(self.run_config)
719        structured_output_mode = run_config.structured_output_mode
720
721        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
722        # Look up our recommended mode from ml_model_list if we have one
723        if structured_output_mode == StructuredOutputMode.unknown:
724            new_run_config = run_config.model_copy(deep=True)
725            structured_output_mode = default_structured_output_mode_for_model_provider(
726                run_config.model_name,
727                run_config.model_provider_name,
728            )
729            new_run_config.structured_output_mode = structured_output_mode
730            self.run_config = new_run_config
731
732    async def available_tools(self) -> list[KilnToolInterface]:
733        if self.run_config.type != "kiln_agent":
734            return []
735        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
736        if tool_config is None or tool_config.tools is None:
737            return []
738
739        non_skill_tool_ids = [
740            tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX)
741        ]
742
743        tools: list[KilnToolInterface] = [
744            tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids
745        ]
746
747        skills = self._resolve_skills()
748        if skills:
749            seen_names: set[str] = set()
750            for skill in skills:
751                if skill.name in seen_names:
752                    raise ValueError(
753                        f"Duplicate skill name '{skill.name}'. Each skill must have a unique name."
754                    )
755                seen_names.add(skill.name)
756            tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills))
757
758        tool_names = [await tool.name() for tool in tools]
759        if len(tool_names) != len(set(tool_names)):
760            raise ValueError(
761                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
762            )
763
764        return tools

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.

task
run_config: Annotated[Union[Annotated[kiln_ai.datamodel.run_config.KilnAgentRunConfigProperties, Tag(tag='kiln_agent')], Annotated[kiln_ai.datamodel.run_config.McpRunConfigProperties, Tag(tag='mcp')]], Discriminator(discriminator=<function _get_run_config_type at 0x7f034561c680>, custom_error_type=None, custom_error_message=None, custom_error_context=None)]
base_adapter_config
output_schema
input_schema
def model_provider(self) -> kiln_ai.adapters.ml_model_list.KilnModelProvider:
167    def model_provider(self) -> KilnModelProvider:
168        """
169        Lazy load the model provider for this adapter.
170        """
171        if self._model_provider is not None:
172            return self._model_provider
173        run_config = as_kiln_agent_run_config(self.run_config)
174        if not run_config.model_name or not run_config.model_provider_name:
175            raise ValueError("model_name and model_provider_name must be provided")
176        self._model_provider = kiln_model_provider_from(
177            run_config.model_name, run_config.model_provider_name
178        )
179        if not self._model_provider:
180            raise ValueError(
181                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
182            )
183        return self._model_provider

Lazy load the model provider for this adapter.

async def invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> kiln_ai.datamodel.TaskRun:
203    async def invoke(
204        self,
205        input: InputType,
206        input_source: DataSource | None = None,
207        prior_trace: list[ChatCompletionMessageParam] | None = None,
208        parent_task_run: TaskRun | None = None,
209    ) -> TaskRun:
210        task_run, _ = await self.invoke_returning_run_output(
211            input, input_source, prior_trace, parent_task_run
212        )
213        return task_run
async def invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
312    async def invoke_returning_run_output(
313        self,
314        input: InputType,
315        input_source: DataSource | None = None,
316        prior_trace: list[ChatCompletionMessageParam] | None = None,
317        parent_task_run: TaskRun | None = None,
318    ) -> Tuple[TaskRun, RunOutput]:
319        # Determine if this is the root agent (no existing run context)
320        is_root_agent = get_agent_run_id() is None
321
322        if is_root_agent:
323            run_id = generate_agent_run_id()
324            set_agent_run_id(run_id)
325
326        try:
327            return await self._run_returning_run_output(
328                input, input_source, prior_trace, parent_task_run
329            )
330        finally:
331            if is_root_agent:
332                try:
333                    run_id = get_agent_run_id()
334                    if run_id:
335                        await MCPSessionManager.shared().cleanup_session(run_id)
336                finally:
337                    clear_agent_run_id()
def invoke_openai_stream( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> OpenAIStreamResult:
339    def invoke_openai_stream(
340        self,
341        input: InputType,
342        input_source: DataSource | None = None,
343        prior_trace: list[ChatCompletionMessageParam] | None = None,
344        parent_task_run: TaskRun | None = None,
345    ) -> OpenAIStreamResult:
346        """Stream raw OpenAI-protocol chunks for the task execution.
347
348        Returns an async-iterable that yields ``ModelResponseStream`` chunks
349        as they arrive from the model.  After the iterator is exhausted the
350        run has been validated and saved (when configured).  The resulting
351        ``TaskRun`` is available via the ``.task_run`` property.
352
353        Tool-call rounds happen internally and are not surfaced; use
354        ``invoke_ai_sdk_stream`` if you need tool-call events.
355        """
356        return OpenAIStreamResult(
357            self, input, input_source, prior_trace, parent_task_run
358        )

Stream raw OpenAI-protocol chunks for the task execution.

Returns an async-iterable that yields ModelResponseStream chunks as they arrive from the model. After the iterator is exhausted the run has been validated and saved (when configured). The resulting TaskRun is available via the .task_run property.

Tool-call rounds happen internally and are not surfaced; use invoke_ai_sdk_stream if you need tool-call events.

def invoke_ai_sdk_stream( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> AiSdkStreamResult:
360    def invoke_ai_sdk_stream(
361        self,
362        input: InputType,
363        input_source: DataSource | None = None,
364        prior_trace: list[ChatCompletionMessageParam] | None = None,
365        parent_task_run: TaskRun | None = None,
366    ) -> AiSdkStreamResult:
367        """Stream AI SDK protocol events for the task execution.
368
369        Returns an async-iterable that yields ``AiSdkStreamEvent`` instances
370        covering text, reasoning, tool-call lifecycle, step boundaries, and
371        control events.  After the iterator is exhausted the resulting
372        ``TaskRun`` is available via the ``.task_run`` property.
373        """
374        return AiSdkStreamResult(
375            self, input, input_source, prior_trace, parent_task_run
376        )

Stream AI SDK protocol events for the task execution.

Returns an async-iterable that yields AiSdkStreamEvent instances covering text, reasoning, tool-call lifecycle, step boundaries, and control events. After the iterator is exhausted the resulting TaskRun is available via the .task_run property.

def has_structured_output(self) -> bool:
478    def has_structured_output(self) -> bool:
479        return self.output_schema is not None
@abstractmethod
def adapter_name(self) -> str:
481    @abstractmethod
482    def adapter_name(self) -> str:
483        pass
def build_prompt(self) -> str:
501    def build_prompt(self) -> str:
502        if self.prompt_builder is None:
503            raise ValueError("Prompt builder is not available for MCP run config")
504        # The prompt builder needs to know if we want to inject formatting instructions
505        structured_output_mode = as_kiln_agent_run_config(
506            self.run_config
507        ).structured_output_mode
508        add_json_instructions = self.has_structured_output() and (
509            structured_output_mode == StructuredOutputMode.json_instructions
510            or structured_output_mode
511            == StructuredOutputMode.json_instruction_and_object
512        )
513
514        return self.prompt_builder.build_prompt(
515            include_json_instructions=add_json_instructions,
516            skills=self._resolve_skills(),
517        )
def build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str], prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.adapters.chat.ChatFormatter:
571    def build_chat_formatter(
572        self,
573        input: InputType,
574        prior_trace: list[ChatCompletionMessageParam] | None = None,
575    ) -> ChatFormatter:
576        prior_trace = self._normalize_prior_trace(prior_trace)
577        self._reject_multiturn_with_structured_input(prior_trace)
578        if prior_trace is not None:
579            return MultiturnFormatter(prior_trace, input)
580        if self.prompt_builder is None:
581            raise ValueError("Prompt builder is not available for MCP run config")
582        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
583
584        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
585        system_message = self.build_prompt()
586
587        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
588        if not cot_prompt:
589            return get_chat_formatter(
590                strategy=ChatStrategy.single_turn,
591                system_message=system_message,
592                user_input=input,
593            )
594
595        # Some models like finetunes are trained with a specific chat strategy. Use that.
596        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
597        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
598        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
599            return get_chat_formatter(
600                strategy=tuned_chat_strategy,
601                system_message=system_message,
602                user_input=input,
603                thinking_instructions=cot_prompt,
604            )
605
606        # Pick the best chat strategy for the model given it has a cot prompt.
607        reasoning_capable = self.model_provider().reasoning_capable
608        if reasoning_capable:
609            # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format.
610            # A simple message with the COT prompt appended to the message list is sufficient
611            return get_chat_formatter(
612                strategy=ChatStrategy.single_turn_r1_thinking,
613                system_message=system_message,
614                user_input=input,
615                thinking_instructions=cot_prompt,
616            )
617        else:
618            # Unstructured output with COT
619            # Two calls to separate the thinking from the final response
620            return get_chat_formatter(
621                strategy=ChatStrategy.two_message_cot,
622                system_message=system_message,
623                user_input=input,
624                thinking_instructions=cot_prompt,
625            )
def generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> kiln_ai.datamodel.TaskRun:
628    def generate_run(
629        self,
630        input: InputType,
631        input_source: DataSource | None,
632        run_output: RunOutput,
633        usage: Usage | None = None,
634        trace: list[ChatCompletionMessageParam] | None = None,
635        parent_task_run: TaskRun | None = None,
636    ) -> TaskRun:
637        output_str = (
638            json.dumps(run_output.output, ensure_ascii=False)
639            if isinstance(run_output.output, dict)
640            else run_output.output
641        )
642
643        output_source_type = (
644            DataSourceType.tool_call
645            if self.run_config.type == "mcp"
646            else DataSourceType.synthetic
647        )
648
649        new_output = TaskOutput(
650            output=output_str,
651            source=DataSource(
652                type=output_source_type,
653                properties=self._properties_for_task_output(),
654                run_config=self.run_config,
655            ),
656        )
657
658        # Convert input and output to JSON strings if they aren't strings
659        input_str = (
660            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
661        )
662
663        if input_source is None:
664            input_source = DataSource(
665                type=DataSourceType.human,
666                properties={"created_by": Config.shared().user_id},
667            )
668
669        parent_task_run_id: str | None = None
670        if parent_task_run is not None:
671            if parent_task_run.id is None:
672                raise ValueError(
673                    "parent_task_run must be persisted before using as parent: save the parent "
674                    "TaskRun (e.g. save_to_file()) so it has a stable id."
675                )
676            parent_task_run_id = parent_task_run.id
677
678        return TaskRun(
679            parent=self.task,
680            parent_task_run_id=parent_task_run_id,
681            input=input_str,
682            input_source=input_source,
683            output=new_output,
684            intermediate_outputs=run_output.intermediate_outputs,
685            tags=self.base_adapter_config.default_tags or [],
686            usage=usage,
687            trace=trace,
688            cumulative_usage=MessageUsage.from_trace(trace),
689        )
def update_run_config_unknown_structured_output_mode(self) -> None:
715    def update_run_config_unknown_structured_output_mode(self) -> None:
716        if self.run_config.type != "kiln_agent":
717            return
718        run_config = as_kiln_agent_run_config(self.run_config)
719        structured_output_mode = run_config.structured_output_mode
720
721        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
722        # Look up our recommended mode from ml_model_list if we have one
723        if structured_output_mode == StructuredOutputMode.unknown:
724            new_run_config = run_config.model_copy(deep=True)
725            structured_output_mode = default_structured_output_mode_for_model_provider(
726                run_config.model_name,
727                run_config.model_provider_name,
728            )
729            new_run_config.structured_output_mode = structured_output_mode
730            self.run_config = new_run_config
async def available_tools(self) -> list[kiln_ai.tools.KilnToolInterface]:
732    async def available_tools(self) -> list[KilnToolInterface]:
733        if self.run_config.type != "kiln_agent":
734            return []
735        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
736        if tool_config is None or tool_config.tools is None:
737            return []
738
739        non_skill_tool_ids = [
740            tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX)
741        ]
742
743        tools: list[KilnToolInterface] = [
744            tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids
745        ]
746
747        skills = self._resolve_skills()
748        if skills:
749            seen_names: set[str] = set()
750            for skill in skills:
751                if skill.name in seen_names:
752                    raise ValueError(
753                        f"Duplicate skill name '{skill.name}'. Each skill must have a unique name."
754                    )
755                seen_names.add(skill.name)
756            tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills))
757
758        tool_names = [await tool.name() for tool in tools]
759        if len(tool_names) != len(set(tool_names)):
760            raise ValueError(
761                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
762            )
763
764        return tools
class OpenAIStreamResult:
767class OpenAIStreamResult:
768    """Async-iterable wrapper around the OpenAI streaming flow.
769
770    Yields ``ModelResponseStream`` chunks.  After iteration the resulting
771    ``TaskRun`` is available via the ``.task_run`` property.
772
773    When return_on_tool_call=True and the model requests tool calls, the stream
774    will stop and ``task_run.is_toolcall_pending`` will be True.
775    """
776
777    def __init__(
778        self,
779        adapter: BaseAdapter,
780        input: InputType,
781        input_source: DataSource | None,
782        prior_trace: list[ChatCompletionMessageParam] | None,
783        parent_task_run: TaskRun | None = None,
784    ) -> None:
785        self._adapter = adapter
786        self._input = input
787        self._input_source = input_source
788        self._prior_trace = prior_trace
789        self._parent_task_run = parent_task_run
790        self._task_run: TaskRun | None = None
791
792    @property
793    def task_run(self) -> TaskRun:
794        if self._task_run is None:
795            raise RuntimeError(
796                "Stream has not been fully consumed yet. "
797                "Iterate over the stream before accessing .task_run"
798            )
799        return self._task_run
800
801    async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
802        self._task_run = None
803        is_root_agent = get_agent_run_id() is None
804        if is_root_agent:
805            set_agent_run_id(generate_agent_run_id())
806
807        try:
808            adapter_stream = self._adapter._prepare_stream(
809                self._input, self._prior_trace
810            )
811
812            async for event in adapter_stream:
813                if isinstance(event, ModelResponseStream):
814                    yield event
815
816            self._task_run = self._adapter._finalize_stream(
817                adapter_stream, self._input, self._input_source, self._parent_task_run
818            )
819        finally:
820            if is_root_agent:
821                try:
822                    run_id = get_agent_run_id()
823                    if run_id:
824                        await MCPSessionManager.shared().cleanup_session(run_id)
825                finally:
826                    clear_agent_run_id()

Async-iterable wrapper around the OpenAI streaming flow.

Yields ModelResponseStream chunks. After iteration the resulting TaskRun is available via the .task_run property.

When return_on_tool_call=True and the model requests tool calls, the stream will stop and task_run.is_toolcall_pending will be True.

OpenAIStreamResult( adapter: BaseAdapter, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None)
777    def __init__(
778        self,
779        adapter: BaseAdapter,
780        input: InputType,
781        input_source: DataSource | None,
782        prior_trace: list[ChatCompletionMessageParam] | None,
783        parent_task_run: TaskRun | None = None,
784    ) -> None:
785        self._adapter = adapter
786        self._input = input
787        self._input_source = input_source
788        self._prior_trace = prior_trace
789        self._parent_task_run = parent_task_run
790        self._task_run: TaskRun | None = None
task_run: kiln_ai.datamodel.TaskRun
792    @property
793    def task_run(self) -> TaskRun:
794        if self._task_run is None:
795            raise RuntimeError(
796                "Stream has not been fully consumed yet. "
797                "Iterate over the stream before accessing .task_run"
798            )
799        return self._task_run
class AiSdkStreamResult:
829class AiSdkStreamResult:
830    """Async-iterable wrapper around the AI SDK streaming flow.
831
832    Yields ``AiSdkStreamEvent`` instances.  After iteration the resulting
833    ``TaskRun`` is available via the ``.task_run`` property.
834
835    When return_on_tool_call=True and the model requests tool calls, the FINISH
836    event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending``
837    will be True.
838    """
839
840    def __init__(
841        self,
842        adapter: BaseAdapter,
843        input: InputType,
844        input_source: DataSource | None,
845        prior_trace: list[ChatCompletionMessageParam] | None,
846        parent_task_run: TaskRun | None = None,
847    ) -> None:
848        self._adapter = adapter
849        self._input = input
850        self._input_source = input_source
851        self._prior_trace = prior_trace
852        self._parent_task_run = parent_task_run
853        self._task_run: TaskRun | None = None
854
855    @property
856    def task_run(self) -> TaskRun:
857        if self._task_run is None:
858            raise RuntimeError(
859                "Stream has not been fully consumed yet. "
860                "Iterate over the stream before accessing .task_run"
861            )
862        return self._task_run
863
864    async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]:
865        self._task_run = None
866        is_root_agent = get_agent_run_id() is None
867        if is_root_agent:
868            set_agent_run_id(generate_agent_run_id())
869
870        try:
871            adapter_stream = self._adapter._prepare_stream(
872                self._input, self._prior_trace
873            )
874
875            message_id = f"msg-{uuid.uuid4().hex}"
876            converter = AiSdkStreamConverter()
877
878            yield StartEvent(messageId=message_id)
879            yield StartStepEvent()
880
881            last_event_was_tool_call = False
882            async for event in adapter_stream:
883                if isinstance(event, ModelResponseStream):
884                    if last_event_was_tool_call:
885                        converter.reset_for_next_step()
886                        last_event_was_tool_call = False
887                    for ai_event in converter.convert_chunk(event):
888                        yield ai_event
889                elif isinstance(event, ToolCallEvent):
890                    last_event_was_tool_call = True
891                    for ai_event in converter.convert_tool_event(event):
892                        yield ai_event
893
894            for ai_event in converter.close_open_blocks():
895                yield ai_event
896
897            yield FinishStepEvent()
898
899            self._task_run = self._adapter._finalize_stream(
900                adapter_stream, self._input, self._input_source, self._parent_task_run
901            )
902
903            if self._task_run.is_toolcall_pending:
904                yield FinishEvent(
905                    messageMetadata=FinishMessageMetadata(finishReason="tool-calls"),
906                )
907            else:
908                for ai_event in converter.finalize():
909                    yield ai_event
910        finally:
911            if is_root_agent:
912                try:
913                    run_id = get_agent_run_id()
914                    if run_id:
915                        await MCPSessionManager.shared().cleanup_session(run_id)
916                finally:
917                    clear_agent_run_id()

Async-iterable wrapper around the AI SDK streaming flow.

Yields AiSdkStreamEvent instances. After iteration the resulting TaskRun is available via the .task_run property.

When return_on_tool_call=True and the model requests tool calls, the FINISH event will have finishReason: "tool-calls" and task_run.is_toolcall_pending will be True.

AiSdkStreamResult( adapter: BaseAdapter, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None)
840    def __init__(
841        self,
842        adapter: BaseAdapter,
843        input: InputType,
844        input_source: DataSource | None,
845        prior_trace: list[ChatCompletionMessageParam] | None,
846        parent_task_run: TaskRun | None = None,
847    ) -> None:
848        self._adapter = adapter
849        self._input = input
850        self._input_source = input_source
851        self._prior_trace = prior_trace
852        self._parent_task_run = parent_task_run
853        self._task_run: TaskRun | None = None
task_run: kiln_ai.datamodel.TaskRun
855    @property
856    def task_run(self) -> TaskRun:
857        if self._task_run is None:
858            raise RuntimeError(
859                "Stream has not been fully consumed yet. "
860                "Iterate over the stream before accessing .task_run"
861            )
862        return self._task_run