kiln_ai.adapters.model_adapters.base_adapter

   1from __future__ import annotations
   2
   3import json
   4import uuid
   5from abc import ABCMeta, abstractmethod
   6from dataclasses import dataclass
   7from typing import TYPE_CHECKING, AsyncIterator, Dict, Tuple
   8
   9from litellm.types.utils import ModelResponseStream
  10
  11from kiln_ai.adapters.chat.chat_formatter import (
  12    ChatFormatter,
  13    MultiturnFormatter,
  14    get_chat_formatter,
  15)
  16from kiln_ai.adapters.errors import KilnRunError, format_error_message
  17from kiln_ai.adapters.ml_model_list import (
  18    KilnModelProvider,
  19    StructuredOutputMode,
  20    default_structured_output_mode_for_model_provider,
  21)
  22from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStreamResult
  23from kiln_ai.adapters.model_adapters.stream_events import (
  24    AiSdkStreamConverter,
  25    AiSdkStreamEvent,
  26    FinishEvent,
  27    FinishMessageMetadata,
  28    FinishStepEvent,
  29    StartEvent,
  30    StartStepEvent,
  31    ToolCallEvent,
  32)
  33from kiln_ai.adapters.parsers.json_parser import parse_json_string
  34from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id
  35from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id
  36from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id
  37from kiln_ai.adapters.provider_tools import kiln_model_provider_from
  38from kiln_ai.adapters.run_output import RunOutput
  39from kiln_ai.datamodel import (
  40    DataSource,
  41    DataSourceType,
  42    MessageUsage,
  43    Task,
  44    TaskOutput,
  45    TaskRun,
  46    Usage,
  47)
  48from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType
  49from kiln_ai.datamodel.json_schema import validate_schema_with_value_error
  50from kiln_ai.datamodel.run_config import (
  51    KilnAgentRunConfigProperties,
  52    as_kiln_agent_run_config,
  53)
  54from kiln_ai.datamodel.skill import Skill
  55from kiln_ai.datamodel.task import RunConfigProperties
  56from kiln_ai.datamodel.tool_id import SKILL_TOOL_ID_PREFIX, skill_id_from_tool_id
  57
  58# Import agent run context for run lifecycle management
  59from kiln_ai.run_context import (
  60    clear_agent_run_id,
  61    generate_agent_run_id,
  62    get_agent_run_id,
  63    set_agent_run_id,
  64)
  65from kiln_ai.tools import KilnToolInterface
  66from kiln_ai.tools.mcp_session_manager import MCPSessionManager
  67from kiln_ai.tools.skill_tool import SkillTool
  68from kiln_ai.tools.tool_registry import tool_from_id
  69from kiln_ai.utils.config import Config
  70from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error
  71from kiln_ai.utils.jinja_engine import render_input_transform
  72from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam
  73
  74if TYPE_CHECKING:
  75    from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream
  76
  77SkillsDict = Dict[str, Skill]
  78
  79
  80@dataclass
  81class AdapterConfig:
  82    """
  83    An adapter config is config options that do NOT impact the output of the model.
  84
  85    For example: if it's saved, of if we request additional data like logprobs.
  86    """
  87
  88    allow_saving: bool = True
  89    top_logprobs: int | None = None
  90    default_tags: list[str] | None = None
  91
  92    """
  93    The ID of the TaskRunConfig that originated this run, if any. Stored on the
  94    resulting TaskRun so the run can be traced back to its originating saved config
  95    (in addition to the inline run_config snapshot). None for ad-hoc/inline runs
  96    that were not initiated from a saved TaskRunConfig.
  97    """
  98    task_run_config_id: str | None = None
  99
 100    """
 101    A custom prompt builder can be injected to override the system prompt building process.
 102    If not provided, the prompt builder will be created from the run_config.prompt_id which
 103    may load additional files from disk.
 104    """
 105    prompt_builder: BasePromptBuilder | None = None
 106
 107    """
 108    Pre-loaded skills keyed by skill ID. When the run config references skills,
 109    they are looked up from this dict instead of reading from the filesystem.
 110    Use load_skills_for_task() to build this dict.
 111    """
 112    skills: SkillsDict | None = None
 113
 114    """
 115    When True, the adapter will stop and return control to the caller when a tool call
 116    is invoked, instead of processing tool calls internally. Default is False (process
 117    tool calls internally).
 118    """
 119    return_on_tool_call: bool = False
 120
 121    """
 122    Extra tools provided directly by the caller, in addition to tools resolved from the
 123    task's tool registry. These are sent to the model together with registry tools, and
 124    their names must not collide with registry tool names.
 125
 126    If ``return_on_tool_call`` is False (the default), the adapter executes these tools
 127    itself just like registry tools. If True, the adapter returns as soon as the model
 128    requests a tool call and the caller is responsible for running the tool and passing
 129    results back via ``prior_trace``.
 130    """
 131    unmanaged_tools: list[KilnToolInterface] | None = None
 132
 133    """
 134    When True, automatically inject prompt caching hints into completion
 135    requests. This is a cost optimization and does not affect model output.
 136    """
 137    automatic_prompt_caching: bool = False
 138
 139
 140class BaseAdapter(metaclass=ABCMeta):
 141    """Base class for AI model adapters that handle task execution.
 142
 143    This abstract class provides the foundation for implementing model-specific adapters
 144    that can process tasks with structured or unstructured inputs/outputs. It handles
 145    input/output validation, prompt building, and run tracking.
 146
 147    Prompt building is handled internally by the adapter, which uses a prompt builder
 148    based on the run config. To override the prompt building behavior, pass a custom prompt
 149    builder to the adapter config.
 150    """
 151
 152    def __init__(
 153        self,
 154        task: Task,
 155        run_config: RunConfigProperties,
 156        config: AdapterConfig | None = None,
 157    ):
 158        self.task = task
 159        self.run_config: RunConfigProperties = run_config
 160        self.base_adapter_config = config or AdapterConfig()
 161
 162        if isinstance(run_config, KilnAgentRunConfigProperties):
 163            self.update_run_config_unknown_structured_output_mode()
 164            self.prompt_builder = (
 165                self.base_adapter_config.prompt_builder
 166                or prompt_builder_from_id(run_config.prompt_id, task)
 167            )
 168        else:
 169            self.prompt_builder = None
 170        self._model_provider: KilnModelProvider | None = None
 171        self._resolved_skills: list[Skill] | None = None
 172
 173        self.output_schema = task.output_json_schema
 174        self.input_schema = task.input_json_schema
 175
 176    def model_provider(self) -> KilnModelProvider:
 177        """
 178        Lazy load the model provider for this adapter.
 179        """
 180        if self._model_provider is not None:
 181            return self._model_provider
 182        run_config = as_kiln_agent_run_config(self.run_config)
 183        if not run_config.model_name or not run_config.model_provider_name:
 184            raise ValueError("model_name and model_provider_name must be provided")
 185        self._model_provider = kiln_model_provider_from(
 186            run_config.model_name, run_config.model_provider_name
 187        )
 188        if not self._model_provider:
 189            raise ValueError(
 190                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
 191            )
 192        return self._model_provider
 193
 194    @staticmethod
 195    def _normalize_prior_trace(
 196        prior_trace: list[ChatCompletionMessageParam] | None,
 197    ) -> list[ChatCompletionMessageParam] | None:
 198        if not prior_trace:
 199            return None
 200        return prior_trace
 201
 202    def _reject_multiturn_with_structured_input(
 203        self,
 204        prior_trace: list[ChatCompletionMessageParam] | None,
 205    ) -> None:
 206        if prior_trace is not None and self.input_schema is not None:
 207            raise ValueError(
 208                "Cannot run multiturn execution with a task that has a structured input schema. "
 209                "Use an unstructured task, or call without prior_trace."
 210            )
 211
 212    async def invoke(
 213        self,
 214        input: InputType,
 215        input_source: DataSource | None = None,
 216        prior_trace: list[ChatCompletionMessageParam] | None = None,
 217        parent_task_run: TaskRun | None = None,
 218    ) -> TaskRun:
 219        task_run, _ = await self.invoke_returning_run_output(
 220            input, input_source, prior_trace, parent_task_run
 221        )
 222        return task_run
 223
 224    async def _run_returning_run_output(
 225        self,
 226        input: InputType,
 227        input_source: DataSource | None = None,
 228        prior_trace: list[ChatCompletionMessageParam] | None = None,
 229        parent_task_run: TaskRun | None = None,
 230    ) -> Tuple[TaskRun, RunOutput]:
 231        # Pre-run validation: these checks run before any model call, so
 232        # there is no trace to preserve. They stay outside the
 233        # exception-wrapping block and surface as plain exceptions.
 234        prior_trace = self._normalize_prior_trace(prior_trace)
 235        self._reject_multiturn_with_structured_input(prior_trace)
 236
 237        if self.input_schema is not None:
 238            validate_schema_with_value_error(
 239                input,
 240                self.input_schema,
 241                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
 242                require_object=False,
 243            )
 244
 245        # Apply input transform if configured. The original `input` is preserved
 246        # for TaskRun.input persistence; `model_input` is what the model sees.
 247        model_input = self._apply_input_transform(input)
 248
 249        # Format model input for model call (we save the original input in the
 250        # task without formatting). This runs in the adapter but before any
 251        # trace is built, so it also stays outside the wrapped region.
 252        formatted_input = model_input
 253        formatter_id = self.model_provider().formatter
 254        if formatter_id is not None:
 255            formatter = request_formatter_from_id(formatter_id)
 256            formatted_input = formatter.format_input(model_input)
 257
 258        # Allocate the trace-so-far list here so the reference survives any
 259        # exception thrown from inside `_run` (or the post-processing that
 260        # follows). `_run` must mutate this list in place (extend/append,
 261        # or `list[:] = ...`) and never rebind the local name.
 262        trace_ref: list[ChatCompletionMessageParam] = []
 263        try:
 264            run_output, usage = await self._run(
 265                formatted_input, trace_ref, prior_trace=prior_trace
 266            )
 267
 268            if not run_output.is_toolcall_pending:
 269                # Normal completion: parse and validate output
 270                provider = self.model_provider()
 271                parser = model_parser_from_id(provider.parser)
 272                parsed_output = parser.parse_output(original_output=run_output)
 273
 274                # validate output
 275                if self.output_schema is not None:
 276                    # Parse json to dict if we have structured output
 277                    if isinstance(parsed_output.output, str):
 278                        parsed_output.output = parse_json_string(parsed_output.output)
 279
 280                    if not isinstance(parsed_output.output, dict):
 281                        raise RuntimeError(
 282                            f"structured response is not a dict: {parsed_output.output}"
 283                        )
 284                    validate_schema_with_value_error(
 285                        parsed_output.output,
 286                        self.output_schema,
 287                        "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
 288                    )
 289                else:
 290                    if not isinstance(parsed_output.output, str):
 291                        raise RuntimeError(
 292                            f"response is not a string for non-structured task: {parsed_output.output}"
 293                        )
 294
 295                trace_has_toolcalls = parsed_output.trace is not None and any(
 296                    message.get("role", None) == "tool"
 297                    for message in parsed_output.trace
 298                )
 299
 300                # Validate reasoning content is present if required.
 301                # Models often skip reasoning on the final turn when tools are involved, so we don't require it then.
 302                if (
 303                    provider.reasoning_capable
 304                    and (
 305                        not parsed_output.intermediate_outputs
 306                        or "reasoning" not in parsed_output.intermediate_outputs
 307                    )
 308                    and not (
 309                        provider.reasoning_optional_for_structured_output
 310                        and self.has_structured_output()
 311                    )
 312                    and not trace_has_toolcalls
 313                ):
 314                    raise RuntimeError(
 315                        "Reasoning is required for this model, but no reasoning was returned."
 316                    )
 317
 318                run_output = parsed_output
 319
 320            run = self.generate_run(
 321                input,
 322                input_source,
 323                run_output,
 324                usage,
 325                run_output.trace,
 326                parent_task_run,
 327            )
 328
 329            # Save the run if configured to do so, and we have a path to save to
 330            if (
 331                self.base_adapter_config.allow_saving
 332                and Config.shared().autosave_runs
 333                and self.task.path is not None
 334            ):
 335                run.save_to_file()
 336            else:
 337                # Clear the ID to indicate it's not persisted
 338                run.id = None
 339
 340            return run, run_output
 341        except KilnRunError:
 342            # Already wrapped — pass through so we don't double-wrap.
 343            raise
 344        except Exception as e:
 345            # Trace conversion can itself throw (e.g., a malformed partial
 346            # assistant message was appended to `messages` just before the
 347            # real failure). Never let that swallow the original exception —
 348            # fall back to no trace so the user still sees the real error.
 349            partial_trace: list[ChatCompletionMessageParam] | None = None
 350            if trace_ref:
 351                try:
 352                    partial_trace = self._messages_to_trace(trace_ref)
 353                except Exception:
 354                    partial_trace = None
 355            raise KilnRunError(
 356                message=format_error_message(e),
 357                partial_trace=partial_trace,
 358                original=e,
 359            ) from e
 360
 361    async def invoke_returning_run_output(
 362        self,
 363        input: InputType,
 364        input_source: DataSource | None = None,
 365        prior_trace: list[ChatCompletionMessageParam] | None = None,
 366        parent_task_run: TaskRun | None = None,
 367    ) -> Tuple[TaskRun, RunOutput]:
 368        # Determine if this is the root agent (no existing run context)
 369        is_root_agent = get_agent_run_id() is None
 370
 371        if is_root_agent:
 372            run_id = generate_agent_run_id()
 373            set_agent_run_id(run_id)
 374
 375        try:
 376            return await self._run_returning_run_output(
 377                input, input_source, prior_trace, parent_task_run
 378            )
 379        finally:
 380            if is_root_agent:
 381                try:
 382                    run_id = get_agent_run_id()
 383                    if run_id:
 384                        await MCPSessionManager.shared().cleanup_session(run_id)
 385                finally:
 386                    clear_agent_run_id()
 387
 388    def invoke_openai_stream(
 389        self,
 390        input: InputType,
 391        input_source: DataSource | None = None,
 392        prior_trace: list[ChatCompletionMessageParam] | None = None,
 393        parent_task_run: TaskRun | None = None,
 394    ) -> OpenAIStreamResult:
 395        """Stream raw OpenAI-protocol chunks for the task execution.
 396
 397        Returns an async-iterable that yields ``ModelResponseStream`` chunks
 398        as they arrive from the model.  After the iterator is exhausted the
 399        run has been validated and saved (when configured).  The resulting
 400        ``TaskRun`` is available via the ``.task_run`` property.
 401
 402        Tool-call rounds happen internally and are not surfaced; use
 403        ``invoke_ai_sdk_stream`` if you need tool-call events.
 404        """
 405        return OpenAIStreamResult(
 406            self, input, input_source, prior_trace, parent_task_run
 407        )
 408
 409    def invoke_ai_sdk_stream(
 410        self,
 411        input: InputType,
 412        input_source: DataSource | None = None,
 413        prior_trace: list[ChatCompletionMessageParam] | None = None,
 414        parent_task_run: TaskRun | None = None,
 415    ) -> AiSdkStreamResult:
 416        """Stream AI SDK protocol events for the task execution.
 417
 418        Returns an async-iterable that yields ``AiSdkStreamEvent`` instances
 419        covering text, reasoning, tool-call lifecycle, step boundaries, and
 420        control events.  After the iterator is exhausted the resulting
 421        ``TaskRun`` is available via the ``.task_run`` property.
 422        """
 423        return AiSdkStreamResult(
 424            self, input, input_source, prior_trace, parent_task_run
 425        )
 426
 427    def _prepare_stream(
 428        self,
 429        input: InputType,
 430        prior_trace: list[ChatCompletionMessageParam] | None,
 431    ) -> AdapterStream:
 432        prior_trace = self._normalize_prior_trace(prior_trace)
 433        self._reject_multiturn_with_structured_input(prior_trace)
 434
 435        if self.input_schema is not None:
 436            validate_schema_with_value_error(
 437                input,
 438                self.input_schema,
 439                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
 440                require_object=False,
 441            )
 442
 443        model_input = self._apply_input_transform(input)
 444
 445        formatted_input = model_input
 446        formatter_id = self.model_provider().formatter
 447        if formatter_id is not None:
 448            formatter = request_formatter_from_id(formatter_id)
 449            formatted_input = formatter.format_input(model_input)
 450
 451        return self._create_run_stream(formatted_input, prior_trace)
 452
 453    def _finalize_stream(
 454        self,
 455        adapter_stream: AdapterStream,
 456        input: InputType,
 457        input_source: DataSource | None,
 458        parent_task_run: TaskRun | None = None,
 459    ) -> TaskRun:
 460        """Streaming invocations are only concerned with passing through events as they come in.
 461        At the end of the stream, we still need to validate the output, create a run and everything
 462        else that a non-streaming invocation would do.
 463        """
 464
 465        result: AdapterStreamResult = adapter_stream.result
 466        run_output = result.run_output
 467        usage = result.usage
 468
 469        if not run_output.is_toolcall_pending:
 470            # Normal completion: parse and validate output
 471            provider = self.model_provider()
 472            parser = model_parser_from_id(provider.parser)
 473            parsed_output = parser.parse_output(original_output=run_output)
 474
 475            if self.output_schema is not None:
 476                if isinstance(parsed_output.output, str):
 477                    parsed_output.output = parse_json_string(parsed_output.output)
 478                if not isinstance(parsed_output.output, dict):
 479                    raise RuntimeError(
 480                        f"structured response is not a dict: {parsed_output.output}"
 481                    )
 482                validate_schema_with_value_error(
 483                    parsed_output.output,
 484                    self.output_schema,
 485                    "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
 486                )
 487            else:
 488                if not isinstance(parsed_output.output, str):
 489                    raise RuntimeError(
 490                        f"response is not a string for non-structured task: {parsed_output.output}"
 491                    )
 492
 493            trace_has_toolcalls = parsed_output.trace is not None and any(
 494                message.get("role", None) == "tool" for message in parsed_output.trace
 495            )
 496            if (
 497                provider.reasoning_capable
 498                and (
 499                    not parsed_output.intermediate_outputs
 500                    or "reasoning" not in parsed_output.intermediate_outputs
 501                )
 502                and not (
 503                    provider.reasoning_optional_for_structured_output
 504                    and self.has_structured_output()
 505                )
 506                and not trace_has_toolcalls
 507            ):
 508                raise RuntimeError(
 509                    "Reasoning is required for this model, but no reasoning was returned."
 510                )
 511
 512            run_output = parsed_output
 513
 514        run = self.generate_run(
 515            input, input_source, run_output, usage, run_output.trace, parent_task_run
 516        )
 517
 518        if (
 519            self.base_adapter_config.allow_saving
 520            and Config.shared().autosave_runs
 521            and self.task.path is not None
 522        ):
 523            run.save_to_file()
 524        else:
 525            run.id = None
 526
 527        return run
 528
 529    def _apply_input_transform(self, input: InputType) -> InputType:
 530        """If the run config has an input_transform, render it and return the
 531        resulting string. Otherwise return input unchanged.
 532
 533        MCP run configs (no input_transform field) are a no-op.
 534        """
 535        if not isinstance(self.run_config, KilnAgentRunConfigProperties):
 536            return input
 537        transform = self.run_config.input_transform
 538        if transform is None:
 539            return input
 540        try:
 541            return render_input_transform(transform, input)
 542        except Exception as e:
 543            raise ValueError(f"Input transform failed: {e}") from e
 544
 545    def has_structured_output(self) -> bool:
 546        return self.output_schema is not None
 547
 548    @abstractmethod
 549    def adapter_name(self) -> str:
 550        pass
 551
 552    @abstractmethod
 553    async def _run(
 554        self,
 555        input: InputType,
 556        trace_ref: list[ChatCompletionMessageParam],
 557        prior_trace: list[ChatCompletionMessageParam] | None = None,
 558    ) -> Tuple[RunOutput, Usage | None]:
 559        """Run the model. Implementations MUST mutate `trace_ref` in place
 560        (extend/append, or `trace_ref[:] = ...`) — never rebind it — so the
 561        caller keeps a live reference to the partial trace if an exception
 562        escapes.
 563        """
 564        pass
 565
 566    def _messages_to_trace(
 567        self,
 568        messages: list[ChatCompletionMessageParam],
 569    ) -> list[ChatCompletionMessageParam]:
 570        """Convert the adapter's internal `messages` list to an API-safe trace.
 571
 572        Default implementation returns the list as-is. Adapters that store
 573        internal message objects (e.g. LiteLLM's `Message`) should override
 574        this to normalize to `ChatCompletionMessageParam` shapes.
 575        """
 576        return messages
 577
 578    def _create_run_stream(
 579        self,
 580        input: InputType,
 581        prior_trace: list[ChatCompletionMessageParam] | None = None,
 582    ) -> AdapterStream:
 583        """Create a stream for the adapter. Implementations must override this method to support streaming."""
 584        raise NotImplementedError("Streaming is not supported for this adapter type")
 585
 586    def build_prompt(self) -> str:
 587        if self.prompt_builder is None:
 588            raise ValueError("Prompt builder is not available for MCP run config")
 589        # The prompt builder needs to know if we want to inject formatting instructions
 590        structured_output_mode = as_kiln_agent_run_config(
 591            self.run_config
 592        ).structured_output_mode
 593        add_json_instructions = self.has_structured_output() and (
 594            structured_output_mode == StructuredOutputMode.json_instructions
 595            or structured_output_mode
 596            == StructuredOutputMode.json_instruction_and_object
 597        )
 598
 599        return self.prompt_builder.build_prompt(
 600            include_json_instructions=add_json_instructions,
 601            skills=self._resolve_skills(),
 602        )
 603
 604    def _resolve_skills(self) -> list[Skill]:
 605        """Resolve skills from the injected skills dict.
 606
 607        Uses the pre-loaded skills dict from AdapterConfig. Caches the result
 608        so that build_prompt and available_tools don't repeat
 609        the lookup. Raises ValueError if the run config references a skill
 610        that is not in the injected dict.
 611        """
 612        if self._resolved_skills is not None:
 613            return self._resolved_skills
 614
 615        if self.run_config.type != "kiln_agent":
 616            self._resolved_skills = []
 617            return self._resolved_skills
 618
 619        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
 620        if tool_config is None or tool_config.tools is None:
 621            self._resolved_skills = []
 622            return self._resolved_skills
 623
 624        skill_tool_ids = [
 625            tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX)
 626        ]
 627        if not skill_tool_ids:
 628            self._resolved_skills = []
 629            return self._resolved_skills
 630
 631        injected = self.base_adapter_config.skills
 632        if injected is None:
 633            raise ValueError(
 634                "Run config references skills but no skills dict was provided via "
 635                "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load "
 636                "skills and pass them to the adapter."
 637            )
 638
 639        skills: list[Skill] = []
 640        seen: set[str] = set()
 641        for tool_id in skill_tool_ids:
 642            sid = skill_id_from_tool_id(tool_id)
 643            if sid not in injected:
 644                raise ValueError(
 645                    f"Skill {sid} referenced in run config but not found in the "
 646                    "injected skills dict."
 647                )
 648            if sid in seen:
 649                continue
 650            seen.add(sid)
 651            skills.append(injected[sid])
 652
 653        self._resolved_skills = skills
 654        return self._resolved_skills
 655
 656    def build_chat_formatter(
 657        self,
 658        input: InputType,
 659        prior_trace: list[ChatCompletionMessageParam] | None = None,
 660    ) -> ChatFormatter:
 661        prior_trace = self._normalize_prior_trace(prior_trace)
 662        self._reject_multiturn_with_structured_input(prior_trace)
 663        if prior_trace is not None:
 664            return MultiturnFormatter(prior_trace, input)
 665        if self.prompt_builder is None:
 666            raise ValueError("Prompt builder is not available for MCP run config")
 667        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
 668
 669        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
 670        system_message = self.build_prompt()
 671
 672        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
 673        if not cot_prompt:
 674            return get_chat_formatter(
 675                strategy=ChatStrategy.single_turn,
 676                system_message=system_message,
 677                user_input=input,
 678            )
 679
 680        # Some models like finetunes are trained with a specific chat strategy. Use that.
 681        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
 682        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
 683        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
 684            return get_chat_formatter(
 685                strategy=tuned_chat_strategy,
 686                system_message=system_message,
 687                user_input=input,
 688                thinking_instructions=cot_prompt,
 689            )
 690
 691        # Pick the best chat strategy for the model given it has a cot prompt.
 692        reasoning_capable = self.model_provider().reasoning_capable
 693        if reasoning_capable:
 694            # "Thinking" LLM designed to output thinking in a structured format. We'll use its native format.
 695            # A simple message with the COT prompt appended to the message list is sufficient
 696            return get_chat_formatter(
 697                strategy=ChatStrategy.single_turn_r1_thinking,
 698                system_message=system_message,
 699                user_input=input,
 700                thinking_instructions=cot_prompt,
 701            )
 702        else:
 703            # Unstructured output with COT
 704            # Two calls to separate the thinking from the final response
 705            return get_chat_formatter(
 706                strategy=ChatStrategy.two_message_cot,
 707                system_message=system_message,
 708                user_input=input,
 709                thinking_instructions=cot_prompt,
 710            )
 711
 712    # create a run and task output
 713    def generate_run(
 714        self,
 715        input: InputType,
 716        input_source: DataSource | None,
 717        run_output: RunOutput,
 718        usage: Usage | None = None,
 719        trace: list[ChatCompletionMessageParam] | None = None,
 720        parent_task_run: TaskRun | None = None,
 721    ) -> TaskRun:
 722        output_str = (
 723            json.dumps(run_output.output, ensure_ascii=False)
 724            if isinstance(run_output.output, dict)
 725            else run_output.output
 726        )
 727
 728        output_source_type = (
 729            DataSourceType.tool_call
 730            if self.run_config.type == "mcp"
 731            else DataSourceType.synthetic
 732        )
 733
 734        new_output = TaskOutput(
 735            output=output_str,
 736            source=DataSource(
 737                type=output_source_type,
 738                properties=self._properties_for_task_output(),
 739                run_config_id=self.base_adapter_config.task_run_config_id,
 740                run_config=self.run_config,
 741            ),
 742        )
 743
 744        # Convert input and output to JSON strings if they aren't strings
 745        input_str = (
 746            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
 747        )
 748
 749        if input_source is None:
 750            input_source = DataSource(
 751                type=DataSourceType.human,
 752                properties={"created_by": Config.shared().user_id},
 753            )
 754
 755        parent_task_run_id: str | None = None
 756        if parent_task_run is not None:
 757            if parent_task_run.id is None:
 758                raise ValueError(
 759                    "parent_task_run must be persisted before using as parent: save the parent "
 760                    "TaskRun (e.g. save_to_file()) so it has a stable id."
 761                )
 762            parent_task_run_id = parent_task_run.id
 763
 764        return TaskRun(
 765            parent=self.task,
 766            parent_task_run_id=parent_task_run_id,
 767            input=input_str,
 768            input_source=input_source,
 769            output=new_output,
 770            intermediate_outputs=run_output.intermediate_outputs,
 771            tags=self.base_adapter_config.default_tags or [],
 772            usage=usage,
 773            trace=trace,
 774            cumulative_usage=MessageUsage.from_trace(trace),
 775        )
 776
 777    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
 778        match self.run_config.type:
 779            case "mcp":
 780                return {}
 781            case "kiln_agent":
 782                if not isinstance(self.run_config, KilnAgentRunConfigProperties):
 783                    raise ValueError("Kiln agent run config is required")
 784                run_config = self.run_config
 785
 786                props: Dict[str, str | int | float] = {}
 787                props["adapter_name"] = self.adapter_name()
 788                # Legacy properties where we save the run_config details into custom properties.
 789                # These are now also be saved in the run_config field.
 790                props["model_name"] = run_config.model_name
 791                props["model_provider"] = run_config.model_provider_name
 792                props["prompt_id"] = run_config.prompt_id
 793                props["structured_output_mode"] = run_config.structured_output_mode
 794                props["temperature"] = run_config.temperature
 795                props["top_p"] = run_config.top_p
 796
 797                return props
 798            case _:
 799                raise_exhaustive_enum_error(self.run_config.type)
 800
 801    def update_run_config_unknown_structured_output_mode(self) -> None:
 802        if self.run_config.type != "kiln_agent":
 803            return
 804        run_config = as_kiln_agent_run_config(self.run_config)
 805        structured_output_mode = run_config.structured_output_mode
 806
 807        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
 808        # Look up our recommended mode from ml_model_list if we have one
 809        if structured_output_mode == StructuredOutputMode.unknown:
 810            new_run_config = run_config.model_copy(deep=True)
 811            structured_output_mode = default_structured_output_mode_for_model_provider(
 812                run_config.model_name,
 813                run_config.model_provider_name,
 814            )
 815            new_run_config.structured_output_mode = structured_output_mode
 816            self.run_config = new_run_config
 817
 818    async def available_tools(self) -> list[KilnToolInterface]:
 819        if self.run_config.type != "kiln_agent":
 820            return []
 821        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
 822        if tool_config is None or tool_config.tools is None:
 823            return []
 824
 825        non_skill_tool_ids = [
 826            tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX)
 827        ]
 828
 829        tools: list[KilnToolInterface] = [
 830            tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids
 831        ]
 832
 833        skills = self._resolve_skills()
 834        if skills:
 835            seen_names: set[str] = set()
 836            for skill in skills:
 837                if skill.name in seen_names:
 838                    raise ValueError(
 839                        f"Duplicate skill name '{skill.name}'. Each skill must have a unique name."
 840                    )
 841                seen_names.add(skill.name)
 842            tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills))
 843
 844        tool_names = [await tool.name() for tool in tools]
 845        if len(tool_names) != len(set(tool_names)):
 846            raise ValueError(
 847                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
 848            )
 849
 850        return tools
 851
 852
 853class OpenAIStreamResult:
 854    """Async-iterable wrapper around the OpenAI streaming flow.
 855
 856    Yields ``ModelResponseStream`` chunks.  After iteration the resulting
 857    ``TaskRun`` is available via the ``.task_run`` property.
 858
 859    When return_on_tool_call=True and the model requests tool calls, the stream
 860    will stop and ``task_run.is_toolcall_pending`` will be True.
 861    """
 862
 863    def __init__(
 864        self,
 865        adapter: BaseAdapter,
 866        input: InputType,
 867        input_source: DataSource | None,
 868        prior_trace: list[ChatCompletionMessageParam] | None,
 869        parent_task_run: TaskRun | None = None,
 870    ) -> None:
 871        self._adapter = adapter
 872        self._input = input
 873        self._input_source = input_source
 874        self._prior_trace = prior_trace
 875        self._parent_task_run = parent_task_run
 876        self._task_run: TaskRun | None = None
 877
 878    @property
 879    def task_run(self) -> TaskRun:
 880        if self._task_run is None:
 881            raise RuntimeError(
 882                "Stream has not been fully consumed yet. "
 883                "Iterate over the stream before accessing .task_run"
 884            )
 885        return self._task_run
 886
 887    async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
 888        self._task_run = None
 889        is_root_agent = get_agent_run_id() is None
 890        if is_root_agent:
 891            set_agent_run_id(generate_agent_run_id())
 892
 893        try:
 894            adapter_stream = self._adapter._prepare_stream(
 895                self._input, self._prior_trace
 896            )
 897
 898            async for event in adapter_stream:
 899                if isinstance(event, ModelResponseStream):
 900                    yield event
 901
 902            self._task_run = self._adapter._finalize_stream(
 903                adapter_stream, self._input, self._input_source, self._parent_task_run
 904            )
 905        finally:
 906            if is_root_agent:
 907                try:
 908                    run_id = get_agent_run_id()
 909                    if run_id:
 910                        await MCPSessionManager.shared().cleanup_session(run_id)
 911                finally:
 912                    clear_agent_run_id()
 913
 914
 915class AiSdkStreamResult:
 916    """Async-iterable wrapper around the AI SDK streaming flow.
 917
 918    Yields ``AiSdkStreamEvent`` instances.  After iteration the resulting
 919    ``TaskRun`` is available via the ``.task_run`` property.
 920
 921    When return_on_tool_call=True and the model requests tool calls, the FINISH
 922    event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending``
 923    will be True.
 924    """
 925
 926    def __init__(
 927        self,
 928        adapter: BaseAdapter,
 929        input: InputType,
 930        input_source: DataSource | None,
 931        prior_trace: list[ChatCompletionMessageParam] | None,
 932        parent_task_run: TaskRun | None = None,
 933    ) -> None:
 934        self._adapter = adapter
 935        self._input = input
 936        self._input_source = input_source
 937        self._prior_trace = prior_trace
 938        self._parent_task_run = parent_task_run
 939        self._task_run: TaskRun | None = None
 940
 941    @property
 942    def task_run(self) -> TaskRun:
 943        if self._task_run is None:
 944            raise RuntimeError(
 945                "Stream has not been fully consumed yet. "
 946                "Iterate over the stream before accessing .task_run"
 947            )
 948        return self._task_run
 949
 950    async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]:
 951        self._task_run = None
 952        is_root_agent = get_agent_run_id() is None
 953        if is_root_agent:
 954            set_agent_run_id(generate_agent_run_id())
 955
 956        try:
 957            adapter_stream = self._adapter._prepare_stream(
 958                self._input, self._prior_trace
 959            )
 960
 961            message_id = f"msg-{uuid.uuid4().hex}"
 962            converter = AiSdkStreamConverter()
 963
 964            yield StartEvent(messageId=message_id)
 965            yield StartStepEvent()
 966
 967            last_event_was_tool_call = False
 968            async for event in adapter_stream:
 969                if isinstance(event, ModelResponseStream):
 970                    if last_event_was_tool_call:
 971                        converter.reset_for_next_step()
 972                        last_event_was_tool_call = False
 973                    for ai_event in converter.convert_chunk(event):
 974                        yield ai_event
 975                elif isinstance(event, ToolCallEvent):
 976                    last_event_was_tool_call = True
 977                    for ai_event in converter.convert_tool_event(event):
 978                        yield ai_event
 979
 980            for ai_event in converter.close_open_blocks():
 981                yield ai_event
 982
 983            yield FinishStepEvent()
 984
 985            self._task_run = self._adapter._finalize_stream(
 986                adapter_stream, self._input, self._input_source, self._parent_task_run
 987            )
 988
 989            if self._task_run.is_toolcall_pending:
 990                yield FinishEvent(
 991                    messageMetadata=FinishMessageMetadata(finishReason="tool-calls"),
 992                )
 993            else:
 994                for ai_event in converter.finalize():
 995                    yield ai_event
 996        finally:
 997            if is_root_agent:
 998                try:
 999                    run_id = get_agent_run_id()
1000                    if run_id:
1001                        await MCPSessionManager.shared().cleanup_session(run_id)
1002                finally:
1003                    clear_agent_run_id()
SkillsDict = typing.Dict[str, kiln_ai.datamodel.Skill]
@dataclass
class AdapterConfig:
 81@dataclass
 82class AdapterConfig:
 83    """
 84    An adapter config is config options that do NOT impact the output of the model.
 85
 86    For example: if it's saved, of if we request additional data like logprobs.
 87    """
 88
 89    allow_saving: bool = True
 90    top_logprobs: int | None = None
 91    default_tags: list[str] | None = None
 92
 93    """
 94    The ID of the TaskRunConfig that originated this run, if any. Stored on the
 95    resulting TaskRun so the run can be traced back to its originating saved config
 96    (in addition to the inline run_config snapshot). None for ad-hoc/inline runs
 97    that were not initiated from a saved TaskRunConfig.
 98    """
 99    task_run_config_id: str | None = None
100
101    """
102    A custom prompt builder can be injected to override the system prompt building process.
103    If not provided, the prompt builder will be created from the run_config.prompt_id which
104    may load additional files from disk.
105    """
106    prompt_builder: BasePromptBuilder | None = None
107
108    """
109    Pre-loaded skills keyed by skill ID. When the run config references skills,
110    they are looked up from this dict instead of reading from the filesystem.
111    Use load_skills_for_task() to build this dict.
112    """
113    skills: SkillsDict | None = None
114
115    """
116    When True, the adapter will stop and return control to the caller when a tool call
117    is invoked, instead of processing tool calls internally. Default is False (process
118    tool calls internally).
119    """
120    return_on_tool_call: bool = False
121
122    """
123    Extra tools provided directly by the caller, in addition to tools resolved from the
124    task's tool registry. These are sent to the model together with registry tools, and
125    their names must not collide with registry tool names.
126
127    If ``return_on_tool_call`` is False (the default), the adapter executes these tools
128    itself just like registry tools. If True, the adapter returns as soon as the model
129    requests a tool call and the caller is responsible for running the tool and passing
130    results back via ``prior_trace``.
131    """
132    unmanaged_tools: list[KilnToolInterface] | None = None
133
134    """
135    When True, automatically inject prompt caching hints into completion
136    requests. This is a cost optimization and does not affect model output.
137    """
138    automatic_prompt_caching: bool = False

An adapter config is config options that do NOT impact the output of the model.

For example: if it's saved, of if we request additional data like logprobs.

AdapterConfig( allow_saving: bool = True, top_logprobs: int | None = None, default_tags: list[str] | None = None, task_run_config_id: str | None = None, prompt_builder: kiln_ai.adapters.prompt_builders.BasePromptBuilder | None = None, skills: Optional[Dict[str, kiln_ai.datamodel.Skill]] = None, return_on_tool_call: bool = False, unmanaged_tools: list[kiln_ai.tools.KilnToolInterface] | None = None, automatic_prompt_caching: bool = False)
allow_saving: bool = True
top_logprobs: int | None = None
default_tags: list[str] | None = None

The ID of the TaskRunConfig that originated this run, if any. Stored on the resulting TaskRun so the run can be traced back to its originating saved config (in addition to the inline run_config snapshot). None for ad-hoc/inline runs that were not initiated from a saved TaskRunConfig.

task_run_config_id: str | None = None

A custom prompt builder can be injected to override the system prompt building process. If not provided, the prompt builder will be created from the run_config.prompt_id which may load additional files from disk.

Pre-loaded skills keyed by skill ID. When the run config references skills, they are looked up from this dict instead of reading from the filesystem. Use load_skills_for_task() to build this dict.

skills: Optional[Dict[str, kiln_ai.datamodel.Skill]] = None

When True, the adapter will stop and return control to the caller when a tool call is invoked, instead of processing tool calls internally. Default is False (process tool calls internally).

return_on_tool_call: bool = False

Extra tools provided directly by the caller, in addition to tools resolved from the task's tool registry. These are sent to the model together with registry tools, and their names must not collide with registry tool names.

If return_on_tool_call is False (the default), the adapter executes these tools itself just like registry tools. If True, the adapter returns as soon as the model requests a tool call and the caller is responsible for running the tool and passing results back via prior_trace.

unmanaged_tools: list[kiln_ai.tools.KilnToolInterface] | None = None

When True, automatically inject prompt caching hints into completion requests. This is a cost optimization and does not affect model output.

automatic_prompt_caching: bool = False
class BaseAdapter:
141class BaseAdapter(metaclass=ABCMeta):
142    """Base class for AI model adapters that handle task execution.
143
144    This abstract class provides the foundation for implementing model-specific adapters
145    that can process tasks with structured or unstructured inputs/outputs. It handles
146    input/output validation, prompt building, and run tracking.
147
148    Prompt building is handled internally by the adapter, which uses a prompt builder
149    based on the run config. To override the prompt building behavior, pass a custom prompt
150    builder to the adapter config.
151    """
152
153    def __init__(
154        self,
155        task: Task,
156        run_config: RunConfigProperties,
157        config: AdapterConfig | None = None,
158    ):
159        self.task = task
160        self.run_config: RunConfigProperties = run_config
161        self.base_adapter_config = config or AdapterConfig()
162
163        if isinstance(run_config, KilnAgentRunConfigProperties):
164            self.update_run_config_unknown_structured_output_mode()
165            self.prompt_builder = (
166                self.base_adapter_config.prompt_builder
167                or prompt_builder_from_id(run_config.prompt_id, task)
168            )
169        else:
170            self.prompt_builder = None
171        self._model_provider: KilnModelProvider | None = None
172        self._resolved_skills: list[Skill] | None = None
173
174        self.output_schema = task.output_json_schema
175        self.input_schema = task.input_json_schema
176
177    def model_provider(self) -> KilnModelProvider:
178        """
179        Lazy load the model provider for this adapter.
180        """
181        if self._model_provider is not None:
182            return self._model_provider
183        run_config = as_kiln_agent_run_config(self.run_config)
184        if not run_config.model_name or not run_config.model_provider_name:
185            raise ValueError("model_name and model_provider_name must be provided")
186        self._model_provider = kiln_model_provider_from(
187            run_config.model_name, run_config.model_provider_name
188        )
189        if not self._model_provider:
190            raise ValueError(
191                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
192            )
193        return self._model_provider
194
195    @staticmethod
196    def _normalize_prior_trace(
197        prior_trace: list[ChatCompletionMessageParam] | None,
198    ) -> list[ChatCompletionMessageParam] | None:
199        if not prior_trace:
200            return None
201        return prior_trace
202
203    def _reject_multiturn_with_structured_input(
204        self,
205        prior_trace: list[ChatCompletionMessageParam] | None,
206    ) -> None:
207        if prior_trace is not None and self.input_schema is not None:
208            raise ValueError(
209                "Cannot run multiturn execution with a task that has a structured input schema. "
210                "Use an unstructured task, or call without prior_trace."
211            )
212
213    async def invoke(
214        self,
215        input: InputType,
216        input_source: DataSource | None = None,
217        prior_trace: list[ChatCompletionMessageParam] | None = None,
218        parent_task_run: TaskRun | None = None,
219    ) -> TaskRun:
220        task_run, _ = await self.invoke_returning_run_output(
221            input, input_source, prior_trace, parent_task_run
222        )
223        return task_run
224
225    async def _run_returning_run_output(
226        self,
227        input: InputType,
228        input_source: DataSource | None = None,
229        prior_trace: list[ChatCompletionMessageParam] | None = None,
230        parent_task_run: TaskRun | None = None,
231    ) -> Tuple[TaskRun, RunOutput]:
232        # Pre-run validation: these checks run before any model call, so
233        # there is no trace to preserve. They stay outside the
234        # exception-wrapping block and surface as plain exceptions.
235        prior_trace = self._normalize_prior_trace(prior_trace)
236        self._reject_multiturn_with_structured_input(prior_trace)
237
238        if self.input_schema is not None:
239            validate_schema_with_value_error(
240                input,
241                self.input_schema,
242                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
243                require_object=False,
244            )
245
246        # Apply input transform if configured. The original `input` is preserved
247        # for TaskRun.input persistence; `model_input` is what the model sees.
248        model_input = self._apply_input_transform(input)
249
250        # Format model input for model call (we save the original input in the
251        # task without formatting). This runs in the adapter but before any
252        # trace is built, so it also stays outside the wrapped region.
253        formatted_input = model_input
254        formatter_id = self.model_provider().formatter
255        if formatter_id is not None:
256            formatter = request_formatter_from_id(formatter_id)
257            formatted_input = formatter.format_input(model_input)
258
259        # Allocate the trace-so-far list here so the reference survives any
260        # exception thrown from inside `_run` (or the post-processing that
261        # follows). `_run` must mutate this list in place (extend/append,
262        # or `list[:] = ...`) and never rebind the local name.
263        trace_ref: list[ChatCompletionMessageParam] = []
264        try:
265            run_output, usage = await self._run(
266                formatted_input, trace_ref, prior_trace=prior_trace
267            )
268
269            if not run_output.is_toolcall_pending:
270                # Normal completion: parse and validate output
271                provider = self.model_provider()
272                parser = model_parser_from_id(provider.parser)
273                parsed_output = parser.parse_output(original_output=run_output)
274
275                # validate output
276                if self.output_schema is not None:
277                    # Parse json to dict if we have structured output
278                    if isinstance(parsed_output.output, str):
279                        parsed_output.output = parse_json_string(parsed_output.output)
280
281                    if not isinstance(parsed_output.output, dict):
282                        raise RuntimeError(
283                            f"structured response is not a dict: {parsed_output.output}"
284                        )
285                    validate_schema_with_value_error(
286                        parsed_output.output,
287                        self.output_schema,
288                        "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
289                    )
290                else:
291                    if not isinstance(parsed_output.output, str):
292                        raise RuntimeError(
293                            f"response is not a string for non-structured task: {parsed_output.output}"
294                        )
295
296                trace_has_toolcalls = parsed_output.trace is not None and any(
297                    message.get("role", None) == "tool"
298                    for message in parsed_output.trace
299                )
300
301                # Validate reasoning content is present if required.
302                # Models often skip reasoning on the final turn when tools are involved, so we don't require it then.
303                if (
304                    provider.reasoning_capable
305                    and (
306                        not parsed_output.intermediate_outputs
307                        or "reasoning" not in parsed_output.intermediate_outputs
308                    )
309                    and not (
310                        provider.reasoning_optional_for_structured_output
311                        and self.has_structured_output()
312                    )
313                    and not trace_has_toolcalls
314                ):
315                    raise RuntimeError(
316                        "Reasoning is required for this model, but no reasoning was returned."
317                    )
318
319                run_output = parsed_output
320
321            run = self.generate_run(
322                input,
323                input_source,
324                run_output,
325                usage,
326                run_output.trace,
327                parent_task_run,
328            )
329
330            # Save the run if configured to do so, and we have a path to save to
331            if (
332                self.base_adapter_config.allow_saving
333                and Config.shared().autosave_runs
334                and self.task.path is not None
335            ):
336                run.save_to_file()
337            else:
338                # Clear the ID to indicate it's not persisted
339                run.id = None
340
341            return run, run_output
342        except KilnRunError:
343            # Already wrapped — pass through so we don't double-wrap.
344            raise
345        except Exception as e:
346            # Trace conversion can itself throw (e.g., a malformed partial
347            # assistant message was appended to `messages` just before the
348            # real failure). Never let that swallow the original exception —
349            # fall back to no trace so the user still sees the real error.
350            partial_trace: list[ChatCompletionMessageParam] | None = None
351            if trace_ref:
352                try:
353                    partial_trace = self._messages_to_trace(trace_ref)
354                except Exception:
355                    partial_trace = None
356            raise KilnRunError(
357                message=format_error_message(e),
358                partial_trace=partial_trace,
359                original=e,
360            ) from e
361
362    async def invoke_returning_run_output(
363        self,
364        input: InputType,
365        input_source: DataSource | None = None,
366        prior_trace: list[ChatCompletionMessageParam] | None = None,
367        parent_task_run: TaskRun | None = None,
368    ) -> Tuple[TaskRun, RunOutput]:
369        # Determine if this is the root agent (no existing run context)
370        is_root_agent = get_agent_run_id() is None
371
372        if is_root_agent:
373            run_id = generate_agent_run_id()
374            set_agent_run_id(run_id)
375
376        try:
377            return await self._run_returning_run_output(
378                input, input_source, prior_trace, parent_task_run
379            )
380        finally:
381            if is_root_agent:
382                try:
383                    run_id = get_agent_run_id()
384                    if run_id:
385                        await MCPSessionManager.shared().cleanup_session(run_id)
386                finally:
387                    clear_agent_run_id()
388
389    def invoke_openai_stream(
390        self,
391        input: InputType,
392        input_source: DataSource | None = None,
393        prior_trace: list[ChatCompletionMessageParam] | None = None,
394        parent_task_run: TaskRun | None = None,
395    ) -> OpenAIStreamResult:
396        """Stream raw OpenAI-protocol chunks for the task execution.
397
398        Returns an async-iterable that yields ``ModelResponseStream`` chunks
399        as they arrive from the model.  After the iterator is exhausted the
400        run has been validated and saved (when configured).  The resulting
401        ``TaskRun`` is available via the ``.task_run`` property.
402
403        Tool-call rounds happen internally and are not surfaced; use
404        ``invoke_ai_sdk_stream`` if you need tool-call events.
405        """
406        return OpenAIStreamResult(
407            self, input, input_source, prior_trace, parent_task_run
408        )
409
410    def invoke_ai_sdk_stream(
411        self,
412        input: InputType,
413        input_source: DataSource | None = None,
414        prior_trace: list[ChatCompletionMessageParam] | None = None,
415        parent_task_run: TaskRun | None = None,
416    ) -> AiSdkStreamResult:
417        """Stream AI SDK protocol events for the task execution.
418
419        Returns an async-iterable that yields ``AiSdkStreamEvent`` instances
420        covering text, reasoning, tool-call lifecycle, step boundaries, and
421        control events.  After the iterator is exhausted the resulting
422        ``TaskRun`` is available via the ``.task_run`` property.
423        """
424        return AiSdkStreamResult(
425            self, input, input_source, prior_trace, parent_task_run
426        )
427
428    def _prepare_stream(
429        self,
430        input: InputType,
431        prior_trace: list[ChatCompletionMessageParam] | None,
432    ) -> AdapterStream:
433        prior_trace = self._normalize_prior_trace(prior_trace)
434        self._reject_multiturn_with_structured_input(prior_trace)
435
436        if self.input_schema is not None:
437            validate_schema_with_value_error(
438                input,
439                self.input_schema,
440                "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
441                require_object=False,
442            )
443
444        model_input = self._apply_input_transform(input)
445
446        formatted_input = model_input
447        formatter_id = self.model_provider().formatter
448        if formatter_id is not None:
449            formatter = request_formatter_from_id(formatter_id)
450            formatted_input = formatter.format_input(model_input)
451
452        return self._create_run_stream(formatted_input, prior_trace)
453
454    def _finalize_stream(
455        self,
456        adapter_stream: AdapterStream,
457        input: InputType,
458        input_source: DataSource | None,
459        parent_task_run: TaskRun | None = None,
460    ) -> TaskRun:
461        """Streaming invocations are only concerned with passing through events as they come in.
462        At the end of the stream, we still need to validate the output, create a run and everything
463        else that a non-streaming invocation would do.
464        """
465
466        result: AdapterStreamResult = adapter_stream.result
467        run_output = result.run_output
468        usage = result.usage
469
470        if not run_output.is_toolcall_pending:
471            # Normal completion: parse and validate output
472            provider = self.model_provider()
473            parser = model_parser_from_id(provider.parser)
474            parsed_output = parser.parse_output(original_output=run_output)
475
476            if self.output_schema is not None:
477                if isinstance(parsed_output.output, str):
478                    parsed_output.output = parse_json_string(parsed_output.output)
479                if not isinstance(parsed_output.output, dict):
480                    raise RuntimeError(
481                        f"structured response is not a dict: {parsed_output.output}"
482                    )
483                validate_schema_with_value_error(
484                    parsed_output.output,
485                    self.output_schema,
486                    "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.",
487                )
488            else:
489                if not isinstance(parsed_output.output, str):
490                    raise RuntimeError(
491                        f"response is not a string for non-structured task: {parsed_output.output}"
492                    )
493
494            trace_has_toolcalls = parsed_output.trace is not None and any(
495                message.get("role", None) == "tool" for message in parsed_output.trace
496            )
497            if (
498                provider.reasoning_capable
499                and (
500                    not parsed_output.intermediate_outputs
501                    or "reasoning" not in parsed_output.intermediate_outputs
502                )
503                and not (
504                    provider.reasoning_optional_for_structured_output
505                    and self.has_structured_output()
506                )
507                and not trace_has_toolcalls
508            ):
509                raise RuntimeError(
510                    "Reasoning is required for this model, but no reasoning was returned."
511                )
512
513            run_output = parsed_output
514
515        run = self.generate_run(
516            input, input_source, run_output, usage, run_output.trace, parent_task_run
517        )
518
519        if (
520            self.base_adapter_config.allow_saving
521            and Config.shared().autosave_runs
522            and self.task.path is not None
523        ):
524            run.save_to_file()
525        else:
526            run.id = None
527
528        return run
529
530    def _apply_input_transform(self, input: InputType) -> InputType:
531        """If the run config has an input_transform, render it and return the
532        resulting string. Otherwise return input unchanged.
533
534        MCP run configs (no input_transform field) are a no-op.
535        """
536        if not isinstance(self.run_config, KilnAgentRunConfigProperties):
537            return input
538        transform = self.run_config.input_transform
539        if transform is None:
540            return input
541        try:
542            return render_input_transform(transform, input)
543        except Exception as e:
544            raise ValueError(f"Input transform failed: {e}") from e
545
546    def has_structured_output(self) -> bool:
547        return self.output_schema is not None
548
549    @abstractmethod
550    def adapter_name(self) -> str:
551        pass
552
553    @abstractmethod
554    async def _run(
555        self,
556        input: InputType,
557        trace_ref: list[ChatCompletionMessageParam],
558        prior_trace: list[ChatCompletionMessageParam] | None = None,
559    ) -> Tuple[RunOutput, Usage | None]:
560        """Run the model. Implementations MUST mutate `trace_ref` in place
561        (extend/append, or `trace_ref[:] = ...`) — never rebind it — so the
562        caller keeps a live reference to the partial trace if an exception
563        escapes.
564        """
565        pass
566
567    def _messages_to_trace(
568        self,
569        messages: list[ChatCompletionMessageParam],
570    ) -> list[ChatCompletionMessageParam]:
571        """Convert the adapter's internal `messages` list to an API-safe trace.
572
573        Default implementation returns the list as-is. Adapters that store
574        internal message objects (e.g. LiteLLM's `Message`) should override
575        this to normalize to `ChatCompletionMessageParam` shapes.
576        """
577        return messages
578
579    def _create_run_stream(
580        self,
581        input: InputType,
582        prior_trace: list[ChatCompletionMessageParam] | None = None,
583    ) -> AdapterStream:
584        """Create a stream for the adapter. Implementations must override this method to support streaming."""
585        raise NotImplementedError("Streaming is not supported for this adapter type")
586
587    def build_prompt(self) -> str:
588        if self.prompt_builder is None:
589            raise ValueError("Prompt builder is not available for MCP run config")
590        # The prompt builder needs to know if we want to inject formatting instructions
591        structured_output_mode = as_kiln_agent_run_config(
592            self.run_config
593        ).structured_output_mode
594        add_json_instructions = self.has_structured_output() and (
595            structured_output_mode == StructuredOutputMode.json_instructions
596            or structured_output_mode
597            == StructuredOutputMode.json_instruction_and_object
598        )
599
600        return self.prompt_builder.build_prompt(
601            include_json_instructions=add_json_instructions,
602            skills=self._resolve_skills(),
603        )
604
605    def _resolve_skills(self) -> list[Skill]:
606        """Resolve skills from the injected skills dict.
607
608        Uses the pre-loaded skills dict from AdapterConfig. Caches the result
609        so that build_prompt and available_tools don't repeat
610        the lookup. Raises ValueError if the run config references a skill
611        that is not in the injected dict.
612        """
613        if self._resolved_skills is not None:
614            return self._resolved_skills
615
616        if self.run_config.type != "kiln_agent":
617            self._resolved_skills = []
618            return self._resolved_skills
619
620        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
621        if tool_config is None or tool_config.tools is None:
622            self._resolved_skills = []
623            return self._resolved_skills
624
625        skill_tool_ids = [
626            tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX)
627        ]
628        if not skill_tool_ids:
629            self._resolved_skills = []
630            return self._resolved_skills
631
632        injected = self.base_adapter_config.skills
633        if injected is None:
634            raise ValueError(
635                "Run config references skills but no skills dict was provided via "
636                "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load "
637                "skills and pass them to the adapter."
638            )
639
640        skills: list[Skill] = []
641        seen: set[str] = set()
642        for tool_id in skill_tool_ids:
643            sid = skill_id_from_tool_id(tool_id)
644            if sid not in injected:
645                raise ValueError(
646                    f"Skill {sid} referenced in run config but not found in the "
647                    "injected skills dict."
648                )
649            if sid in seen:
650                continue
651            seen.add(sid)
652            skills.append(injected[sid])
653
654        self._resolved_skills = skills
655        return self._resolved_skills
656
657    def build_chat_formatter(
658        self,
659        input: InputType,
660        prior_trace: list[ChatCompletionMessageParam] | None = None,
661    ) -> ChatFormatter:
662        prior_trace = self._normalize_prior_trace(prior_trace)
663        self._reject_multiturn_with_structured_input(prior_trace)
664        if prior_trace is not None:
665            return MultiturnFormatter(prior_trace, input)
666        if self.prompt_builder is None:
667            raise ValueError("Prompt builder is not available for MCP run config")
668        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
669
670        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
671        system_message = self.build_prompt()
672
673        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
674        if not cot_prompt:
675            return get_chat_formatter(
676                strategy=ChatStrategy.single_turn,
677                system_message=system_message,
678                user_input=input,
679            )
680
681        # Some models like finetunes are trained with a specific chat strategy. Use that.
682        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
683        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
684        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
685            return get_chat_formatter(
686                strategy=tuned_chat_strategy,
687                system_message=system_message,
688                user_input=input,
689                thinking_instructions=cot_prompt,
690            )
691
692        # Pick the best chat strategy for the model given it has a cot prompt.
693        reasoning_capable = self.model_provider().reasoning_capable
694        if reasoning_capable:
695            # "Thinking" LLM designed to output thinking in a structured format. We'll use its native format.
696            # A simple message with the COT prompt appended to the message list is sufficient
697            return get_chat_formatter(
698                strategy=ChatStrategy.single_turn_r1_thinking,
699                system_message=system_message,
700                user_input=input,
701                thinking_instructions=cot_prompt,
702            )
703        else:
704            # Unstructured output with COT
705            # Two calls to separate the thinking from the final response
706            return get_chat_formatter(
707                strategy=ChatStrategy.two_message_cot,
708                system_message=system_message,
709                user_input=input,
710                thinking_instructions=cot_prompt,
711            )
712
713    # create a run and task output
714    def generate_run(
715        self,
716        input: InputType,
717        input_source: DataSource | None,
718        run_output: RunOutput,
719        usage: Usage | None = None,
720        trace: list[ChatCompletionMessageParam] | None = None,
721        parent_task_run: TaskRun | None = None,
722    ) -> TaskRun:
723        output_str = (
724            json.dumps(run_output.output, ensure_ascii=False)
725            if isinstance(run_output.output, dict)
726            else run_output.output
727        )
728
729        output_source_type = (
730            DataSourceType.tool_call
731            if self.run_config.type == "mcp"
732            else DataSourceType.synthetic
733        )
734
735        new_output = TaskOutput(
736            output=output_str,
737            source=DataSource(
738                type=output_source_type,
739                properties=self._properties_for_task_output(),
740                run_config_id=self.base_adapter_config.task_run_config_id,
741                run_config=self.run_config,
742            ),
743        )
744
745        # Convert input and output to JSON strings if they aren't strings
746        input_str = (
747            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
748        )
749
750        if input_source is None:
751            input_source = DataSource(
752                type=DataSourceType.human,
753                properties={"created_by": Config.shared().user_id},
754            )
755
756        parent_task_run_id: str | None = None
757        if parent_task_run is not None:
758            if parent_task_run.id is None:
759                raise ValueError(
760                    "parent_task_run must be persisted before using as parent: save the parent "
761                    "TaskRun (e.g. save_to_file()) so it has a stable id."
762                )
763            parent_task_run_id = parent_task_run.id
764
765        return TaskRun(
766            parent=self.task,
767            parent_task_run_id=parent_task_run_id,
768            input=input_str,
769            input_source=input_source,
770            output=new_output,
771            intermediate_outputs=run_output.intermediate_outputs,
772            tags=self.base_adapter_config.default_tags or [],
773            usage=usage,
774            trace=trace,
775            cumulative_usage=MessageUsage.from_trace(trace),
776        )
777
778    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
779        match self.run_config.type:
780            case "mcp":
781                return {}
782            case "kiln_agent":
783                if not isinstance(self.run_config, KilnAgentRunConfigProperties):
784                    raise ValueError("Kiln agent run config is required")
785                run_config = self.run_config
786
787                props: Dict[str, str | int | float] = {}
788                props["adapter_name"] = self.adapter_name()
789                # Legacy properties where we save the run_config details into custom properties.
790                # These are now also be saved in the run_config field.
791                props["model_name"] = run_config.model_name
792                props["model_provider"] = run_config.model_provider_name
793                props["prompt_id"] = run_config.prompt_id
794                props["structured_output_mode"] = run_config.structured_output_mode
795                props["temperature"] = run_config.temperature
796                props["top_p"] = run_config.top_p
797
798                return props
799            case _:
800                raise_exhaustive_enum_error(self.run_config.type)
801
802    def update_run_config_unknown_structured_output_mode(self) -> None:
803        if self.run_config.type != "kiln_agent":
804            return
805        run_config = as_kiln_agent_run_config(self.run_config)
806        structured_output_mode = run_config.structured_output_mode
807
808        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
809        # Look up our recommended mode from ml_model_list if we have one
810        if structured_output_mode == StructuredOutputMode.unknown:
811            new_run_config = run_config.model_copy(deep=True)
812            structured_output_mode = default_structured_output_mode_for_model_provider(
813                run_config.model_name,
814                run_config.model_provider_name,
815            )
816            new_run_config.structured_output_mode = structured_output_mode
817            self.run_config = new_run_config
818
819    async def available_tools(self) -> list[KilnToolInterface]:
820        if self.run_config.type != "kiln_agent":
821            return []
822        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
823        if tool_config is None or tool_config.tools is None:
824            return []
825
826        non_skill_tool_ids = [
827            tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX)
828        ]
829
830        tools: list[KilnToolInterface] = [
831            tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids
832        ]
833
834        skills = self._resolve_skills()
835        if skills:
836            seen_names: set[str] = set()
837            for skill in skills:
838                if skill.name in seen_names:
839                    raise ValueError(
840                        f"Duplicate skill name '{skill.name}'. Each skill must have a unique name."
841                    )
842                seen_names.add(skill.name)
843            tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills))
844
845        tool_names = [await tool.name() for tool in tools]
846        if len(tool_names) != len(set(tool_names)):
847            raise ValueError(
848                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
849            )
850
851        return tools

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.

task
run_config: Annotated[Union[Annotated[kiln_ai.datamodel.run_config.KilnAgentRunConfigProperties, Tag(tag='kiln_agent')], Annotated[kiln_ai.datamodel.run_config.McpRunConfigProperties, Tag(tag='mcp')]], Discriminator(discriminator=<function _get_run_config_type at 0x7ff7a131b880>, custom_error_type=None, custom_error_message=None, custom_error_context=None)]
base_adapter_config
output_schema
input_schema
def model_provider(self) -> kiln_ai.adapters.ml_model_list.KilnModelProvider:
177    def model_provider(self) -> KilnModelProvider:
178        """
179        Lazy load the model provider for this adapter.
180        """
181        if self._model_provider is not None:
182            return self._model_provider
183        run_config = as_kiln_agent_run_config(self.run_config)
184        if not run_config.model_name or not run_config.model_provider_name:
185            raise ValueError("model_name and model_provider_name must be provided")
186        self._model_provider = kiln_model_provider_from(
187            run_config.model_name, run_config.model_provider_name
188        )
189        if not self._model_provider:
190            raise ValueError(
191                f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}"
192            )
193        return self._model_provider

Lazy load the model provider for this adapter.

async def invoke( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> kiln_ai.datamodel.TaskRun:
213    async def invoke(
214        self,
215        input: InputType,
216        input_source: DataSource | None = None,
217        prior_trace: list[ChatCompletionMessageParam] | None = None,
218        parent_task_run: TaskRun | None = None,
219    ) -> TaskRun:
220        task_run, _ = await self.invoke_returning_run_output(
221            input, input_source, prior_trace, parent_task_run
222        )
223        return task_run
async def invoke_returning_run_output( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> Tuple[kiln_ai.datamodel.TaskRun, kiln_ai.adapters.run_output.RunOutput]:
362    async def invoke_returning_run_output(
363        self,
364        input: InputType,
365        input_source: DataSource | None = None,
366        prior_trace: list[ChatCompletionMessageParam] | None = None,
367        parent_task_run: TaskRun | None = None,
368    ) -> Tuple[TaskRun, RunOutput]:
369        # Determine if this is the root agent (no existing run context)
370        is_root_agent = get_agent_run_id() is None
371
372        if is_root_agent:
373            run_id = generate_agent_run_id()
374            set_agent_run_id(run_id)
375
376        try:
377            return await self._run_returning_run_output(
378                input, input_source, prior_trace, parent_task_run
379            )
380        finally:
381            if is_root_agent:
382                try:
383                    run_id = get_agent_run_id()
384                    if run_id:
385                        await MCPSessionManager.shared().cleanup_session(run_id)
386                finally:
387                    clear_agent_run_id()
def invoke_openai_stream( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> OpenAIStreamResult:
389    def invoke_openai_stream(
390        self,
391        input: InputType,
392        input_source: DataSource | None = None,
393        prior_trace: list[ChatCompletionMessageParam] | None = None,
394        parent_task_run: TaskRun | None = None,
395    ) -> OpenAIStreamResult:
396        """Stream raw OpenAI-protocol chunks for the task execution.
397
398        Returns an async-iterable that yields ``ModelResponseStream`` chunks
399        as they arrive from the model.  After the iterator is exhausted the
400        run has been validated and saved (when configured).  The resulting
401        ``TaskRun`` is available via the ``.task_run`` property.
402
403        Tool-call rounds happen internally and are not surfaced; use
404        ``invoke_ai_sdk_stream`` if you need tool-call events.
405        """
406        return OpenAIStreamResult(
407            self, input, input_source, prior_trace, parent_task_run
408        )

Stream raw OpenAI-protocol chunks for the task execution.

Returns an async-iterable that yields ModelResponseStream chunks as they arrive from the model. After the iterator is exhausted the run has been validated and saved (when configured). The resulting TaskRun is available via the .task_run property.

Tool-call rounds happen internally and are not surfaced; use invoke_ai_sdk_stream if you need tool-call events.

def invoke_ai_sdk_stream( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None = None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> AiSdkStreamResult:
410    def invoke_ai_sdk_stream(
411        self,
412        input: InputType,
413        input_source: DataSource | None = None,
414        prior_trace: list[ChatCompletionMessageParam] | None = None,
415        parent_task_run: TaskRun | None = None,
416    ) -> AiSdkStreamResult:
417        """Stream AI SDK protocol events for the task execution.
418
419        Returns an async-iterable that yields ``AiSdkStreamEvent`` instances
420        covering text, reasoning, tool-call lifecycle, step boundaries, and
421        control events.  After the iterator is exhausted the resulting
422        ``TaskRun`` is available via the ``.task_run`` property.
423        """
424        return AiSdkStreamResult(
425            self, input, input_source, prior_trace, parent_task_run
426        )

Stream AI SDK protocol events for the task execution.

Returns an async-iterable that yields AiSdkStreamEvent instances covering text, reasoning, tool-call lifecycle, step boundaries, and control events. After the iterator is exhausted the resulting TaskRun is available via the .task_run property.

def has_structured_output(self) -> bool:
546    def has_structured_output(self) -> bool:
547        return self.output_schema is not None
@abstractmethod
def adapter_name(self) -> str:
549    @abstractmethod
550    def adapter_name(self) -> str:
551        pass
def build_prompt(self) -> str:
587    def build_prompt(self) -> str:
588        if self.prompt_builder is None:
589            raise ValueError("Prompt builder is not available for MCP run config")
590        # The prompt builder needs to know if we want to inject formatting instructions
591        structured_output_mode = as_kiln_agent_run_config(
592            self.run_config
593        ).structured_output_mode
594        add_json_instructions = self.has_structured_output() and (
595            structured_output_mode == StructuredOutputMode.json_instructions
596            or structured_output_mode
597            == StructuredOutputMode.json_instruction_and_object
598        )
599
600        return self.prompt_builder.build_prompt(
601            include_json_instructions=add_json_instructions,
602            skills=self._resolve_skills(),
603        )
def build_chat_formatter( self, input: Union[Dict[str, Any], List[Any], str], prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None) -> kiln_ai.adapters.chat.ChatFormatter:
657    def build_chat_formatter(
658        self,
659        input: InputType,
660        prior_trace: list[ChatCompletionMessageParam] | None = None,
661    ) -> ChatFormatter:
662        prior_trace = self._normalize_prior_trace(prior_trace)
663        self._reject_multiturn_with_structured_input(prior_trace)
664        if prior_trace is not None:
665            return MultiturnFormatter(prior_trace, input)
666        if self.prompt_builder is None:
667            raise ValueError("Prompt builder is not available for MCP run config")
668        # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy.
669
670        cot_prompt = self.prompt_builder.chain_of_thought_prompt()
671        system_message = self.build_prompt()
672
673        # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt.
674        if not cot_prompt:
675            return get_chat_formatter(
676                strategy=ChatStrategy.single_turn,
677                system_message=system_message,
678                user_input=input,
679            )
680
681        # Some models like finetunes are trained with a specific chat strategy. Use that.
682        # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy.
683        tuned_chat_strategy = self.model_provider().tuned_chat_strategy
684        if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn:
685            return get_chat_formatter(
686                strategy=tuned_chat_strategy,
687                system_message=system_message,
688                user_input=input,
689                thinking_instructions=cot_prompt,
690            )
691
692        # Pick the best chat strategy for the model given it has a cot prompt.
693        reasoning_capable = self.model_provider().reasoning_capable
694        if reasoning_capable:
695            # "Thinking" LLM designed to output thinking in a structured format. We'll use its native format.
696            # A simple message with the COT prompt appended to the message list is sufficient
697            return get_chat_formatter(
698                strategy=ChatStrategy.single_turn_r1_thinking,
699                system_message=system_message,
700                user_input=input,
701                thinking_instructions=cot_prompt,
702            )
703        else:
704            # Unstructured output with COT
705            # Two calls to separate the thinking from the final response
706            return get_chat_formatter(
707                strategy=ChatStrategy.two_message_cot,
708                system_message=system_message,
709                user_input=input,
710                thinking_instructions=cot_prompt,
711            )
def generate_run( self, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, run_output: kiln_ai.adapters.run_output.RunOutput, usage: kiln_ai.datamodel.Usage | None = None, trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None = None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None) -> kiln_ai.datamodel.TaskRun:
714    def generate_run(
715        self,
716        input: InputType,
717        input_source: DataSource | None,
718        run_output: RunOutput,
719        usage: Usage | None = None,
720        trace: list[ChatCompletionMessageParam] | None = None,
721        parent_task_run: TaskRun | None = None,
722    ) -> TaskRun:
723        output_str = (
724            json.dumps(run_output.output, ensure_ascii=False)
725            if isinstance(run_output.output, dict)
726            else run_output.output
727        )
728
729        output_source_type = (
730            DataSourceType.tool_call
731            if self.run_config.type == "mcp"
732            else DataSourceType.synthetic
733        )
734
735        new_output = TaskOutput(
736            output=output_str,
737            source=DataSource(
738                type=output_source_type,
739                properties=self._properties_for_task_output(),
740                run_config_id=self.base_adapter_config.task_run_config_id,
741                run_config=self.run_config,
742            ),
743        )
744
745        # Convert input and output to JSON strings if they aren't strings
746        input_str = (
747            input if isinstance(input, str) else json.dumps(input, ensure_ascii=False)
748        )
749
750        if input_source is None:
751            input_source = DataSource(
752                type=DataSourceType.human,
753                properties={"created_by": Config.shared().user_id},
754            )
755
756        parent_task_run_id: str | None = None
757        if parent_task_run is not None:
758            if parent_task_run.id is None:
759                raise ValueError(
760                    "parent_task_run must be persisted before using as parent: save the parent "
761                    "TaskRun (e.g. save_to_file()) so it has a stable id."
762                )
763            parent_task_run_id = parent_task_run.id
764
765        return TaskRun(
766            parent=self.task,
767            parent_task_run_id=parent_task_run_id,
768            input=input_str,
769            input_source=input_source,
770            output=new_output,
771            intermediate_outputs=run_output.intermediate_outputs,
772            tags=self.base_adapter_config.default_tags or [],
773            usage=usage,
774            trace=trace,
775            cumulative_usage=MessageUsage.from_trace(trace),
776        )
def update_run_config_unknown_structured_output_mode(self) -> None:
802    def update_run_config_unknown_structured_output_mode(self) -> None:
803        if self.run_config.type != "kiln_agent":
804            return
805        run_config = as_kiln_agent_run_config(self.run_config)
806        structured_output_mode = run_config.structured_output_mode
807
808        # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it.
809        # Look up our recommended mode from ml_model_list if we have one
810        if structured_output_mode == StructuredOutputMode.unknown:
811            new_run_config = run_config.model_copy(deep=True)
812            structured_output_mode = default_structured_output_mode_for_model_provider(
813                run_config.model_name,
814                run_config.model_provider_name,
815            )
816            new_run_config.structured_output_mode = structured_output_mode
817            self.run_config = new_run_config
async def available_tools(self) -> list[kiln_ai.tools.KilnToolInterface]:
819    async def available_tools(self) -> list[KilnToolInterface]:
820        if self.run_config.type != "kiln_agent":
821            return []
822        tool_config = as_kiln_agent_run_config(self.run_config).tools_config
823        if tool_config is None or tool_config.tools is None:
824            return []
825
826        non_skill_tool_ids = [
827            tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX)
828        ]
829
830        tools: list[KilnToolInterface] = [
831            tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids
832        ]
833
834        skills = self._resolve_skills()
835        if skills:
836            seen_names: set[str] = set()
837            for skill in skills:
838                if skill.name in seen_names:
839                    raise ValueError(
840                        f"Duplicate skill name '{skill.name}'. Each skill must have a unique name."
841                    )
842                seen_names.add(skill.name)
843            tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills))
844
845        tool_names = [await tool.name() for tool in tools]
846        if len(tool_names) != len(set(tool_names)):
847            raise ValueError(
848                "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined."
849            )
850
851        return tools
class OpenAIStreamResult:
854class OpenAIStreamResult:
855    """Async-iterable wrapper around the OpenAI streaming flow.
856
857    Yields ``ModelResponseStream`` chunks.  After iteration the resulting
858    ``TaskRun`` is available via the ``.task_run`` property.
859
860    When return_on_tool_call=True and the model requests tool calls, the stream
861    will stop and ``task_run.is_toolcall_pending`` will be True.
862    """
863
864    def __init__(
865        self,
866        adapter: BaseAdapter,
867        input: InputType,
868        input_source: DataSource | None,
869        prior_trace: list[ChatCompletionMessageParam] | None,
870        parent_task_run: TaskRun | None = None,
871    ) -> None:
872        self._adapter = adapter
873        self._input = input
874        self._input_source = input_source
875        self._prior_trace = prior_trace
876        self._parent_task_run = parent_task_run
877        self._task_run: TaskRun | None = None
878
879    @property
880    def task_run(self) -> TaskRun:
881        if self._task_run is None:
882            raise RuntimeError(
883                "Stream has not been fully consumed yet. "
884                "Iterate over the stream before accessing .task_run"
885            )
886        return self._task_run
887
888    async def __aiter__(self) -> AsyncIterator[ModelResponseStream]:
889        self._task_run = None
890        is_root_agent = get_agent_run_id() is None
891        if is_root_agent:
892            set_agent_run_id(generate_agent_run_id())
893
894        try:
895            adapter_stream = self._adapter._prepare_stream(
896                self._input, self._prior_trace
897            )
898
899            async for event in adapter_stream:
900                if isinstance(event, ModelResponseStream):
901                    yield event
902
903            self._task_run = self._adapter._finalize_stream(
904                adapter_stream, self._input, self._input_source, self._parent_task_run
905            )
906        finally:
907            if is_root_agent:
908                try:
909                    run_id = get_agent_run_id()
910                    if run_id:
911                        await MCPSessionManager.shared().cleanup_session(run_id)
912                finally:
913                    clear_agent_run_id()

Async-iterable wrapper around the OpenAI streaming flow.

Yields ModelResponseStream chunks. After iteration the resulting TaskRun is available via the .task_run property.

When return_on_tool_call=True and the model requests tool calls, the stream will stop and task_run.is_toolcall_pending will be True.

OpenAIStreamResult( adapter: BaseAdapter, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None)
864    def __init__(
865        self,
866        adapter: BaseAdapter,
867        input: InputType,
868        input_source: DataSource | None,
869        prior_trace: list[ChatCompletionMessageParam] | None,
870        parent_task_run: TaskRun | None = None,
871    ) -> None:
872        self._adapter = adapter
873        self._input = input
874        self._input_source = input_source
875        self._prior_trace = prior_trace
876        self._parent_task_run = parent_task_run
877        self._task_run: TaskRun | None = None
task_run: kiln_ai.datamodel.TaskRun
879    @property
880    def task_run(self) -> TaskRun:
881        if self._task_run is None:
882            raise RuntimeError(
883                "Stream has not been fully consumed yet. "
884                "Iterate over the stream before accessing .task_run"
885            )
886        return self._task_run
class AiSdkStreamResult:
 916class AiSdkStreamResult:
 917    """Async-iterable wrapper around the AI SDK streaming flow.
 918
 919    Yields ``AiSdkStreamEvent`` instances.  After iteration the resulting
 920    ``TaskRun`` is available via the ``.task_run`` property.
 921
 922    When return_on_tool_call=True and the model requests tool calls, the FINISH
 923    event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending``
 924    will be True.
 925    """
 926
 927    def __init__(
 928        self,
 929        adapter: BaseAdapter,
 930        input: InputType,
 931        input_source: DataSource | None,
 932        prior_trace: list[ChatCompletionMessageParam] | None,
 933        parent_task_run: TaskRun | None = None,
 934    ) -> None:
 935        self._adapter = adapter
 936        self._input = input
 937        self._input_source = input_source
 938        self._prior_trace = prior_trace
 939        self._parent_task_run = parent_task_run
 940        self._task_run: TaskRun | None = None
 941
 942    @property
 943    def task_run(self) -> TaskRun:
 944        if self._task_run is None:
 945            raise RuntimeError(
 946                "Stream has not been fully consumed yet. "
 947                "Iterate over the stream before accessing .task_run"
 948            )
 949        return self._task_run
 950
 951    async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]:
 952        self._task_run = None
 953        is_root_agent = get_agent_run_id() is None
 954        if is_root_agent:
 955            set_agent_run_id(generate_agent_run_id())
 956
 957        try:
 958            adapter_stream = self._adapter._prepare_stream(
 959                self._input, self._prior_trace
 960            )
 961
 962            message_id = f"msg-{uuid.uuid4().hex}"
 963            converter = AiSdkStreamConverter()
 964
 965            yield StartEvent(messageId=message_id)
 966            yield StartStepEvent()
 967
 968            last_event_was_tool_call = False
 969            async for event in adapter_stream:
 970                if isinstance(event, ModelResponseStream):
 971                    if last_event_was_tool_call:
 972                        converter.reset_for_next_step()
 973                        last_event_was_tool_call = False
 974                    for ai_event in converter.convert_chunk(event):
 975                        yield ai_event
 976                elif isinstance(event, ToolCallEvent):
 977                    last_event_was_tool_call = True
 978                    for ai_event in converter.convert_tool_event(event):
 979                        yield ai_event
 980
 981            for ai_event in converter.close_open_blocks():
 982                yield ai_event
 983
 984            yield FinishStepEvent()
 985
 986            self._task_run = self._adapter._finalize_stream(
 987                adapter_stream, self._input, self._input_source, self._parent_task_run
 988            )
 989
 990            if self._task_run.is_toolcall_pending:
 991                yield FinishEvent(
 992                    messageMetadata=FinishMessageMetadata(finishReason="tool-calls"),
 993                )
 994            else:
 995                for ai_event in converter.finalize():
 996                    yield ai_event
 997        finally:
 998            if is_root_agent:
 999                try:
1000                    run_id = get_agent_run_id()
1001                    if run_id:
1002                        await MCPSessionManager.shared().cleanup_session(run_id)
1003                finally:
1004                    clear_agent_run_id()

Async-iterable wrapper around the AI SDK streaming flow.

Yields AiSdkStreamEvent instances. After iteration the resulting TaskRun is available via the .task_run property.

When return_on_tool_call=True and the model requests tool calls, the FINISH event will have finishReason: "tool-calls" and task_run.is_toolcall_pending will be True.

AiSdkStreamResult( adapter: BaseAdapter, input: Union[Dict[str, Any], List[Any], str], input_source: kiln_ai.datamodel.DataSource | None, prior_trace: list[typing.Union[openai.types.chat.chat_completion_developer_message_param.ChatCompletionDeveloperMessageParam, openai.types.chat.chat_completion_system_message_param.ChatCompletionSystemMessageParam, openai.types.chat.chat_completion_user_message_param.ChatCompletionUserMessageParam, kiln_ai.utils.open_ai_types.ChatCompletionAssistantMessageParamWrapper, kiln_ai.utils.open_ai_types.ChatCompletionToolMessageParamWrapper, openai.types.chat.chat_completion_function_message_param.ChatCompletionFunctionMessageParam]] | None, parent_task_run: kiln_ai.datamodel.TaskRun | None = None)
927    def __init__(
928        self,
929        adapter: BaseAdapter,
930        input: InputType,
931        input_source: DataSource | None,
932        prior_trace: list[ChatCompletionMessageParam] | None,
933        parent_task_run: TaskRun | None = None,
934    ) -> None:
935        self._adapter = adapter
936        self._input = input
937        self._input_source = input_source
938        self._prior_trace = prior_trace
939        self._parent_task_run = parent_task_run
940        self._task_run: TaskRun | None = None
task_run: kiln_ai.datamodel.TaskRun
942    @property
943    def task_run(self) -> TaskRun:
944        if self._task_run is None:
945            raise RuntimeError(
946                "Stream has not been fully consumed yet. "
947                "Iterate over the stream before accessing .task_run"
948            )
949        return self._task_run