kiln_ai.adapters.model_adapters.base_adapter
1from __future__ import annotations 2 3import json 4import uuid 5from abc import ABCMeta, abstractmethod 6from dataclasses import dataclass 7from typing import TYPE_CHECKING, AsyncIterator, Dict, Tuple 8 9from litellm.types.utils import ModelResponseStream 10 11from kiln_ai.adapters.chat.chat_formatter import ( 12 ChatFormatter, 13 MultiturnFormatter, 14 get_chat_formatter, 15) 16from kiln_ai.adapters.errors import KilnRunError, format_error_message 17from kiln_ai.adapters.ml_model_list import ( 18 KilnModelProvider, 19 StructuredOutputMode, 20 default_structured_output_mode_for_model_provider, 21) 22from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStreamResult 23from kiln_ai.adapters.model_adapters.stream_events import ( 24 AiSdkStreamConverter, 25 AiSdkStreamEvent, 26 FinishEvent, 27 FinishMessageMetadata, 28 FinishStepEvent, 29 StartEvent, 30 StartStepEvent, 31 ToolCallEvent, 32) 33from kiln_ai.adapters.parsers.json_parser import parse_json_string 34from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id 35from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id 36from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id 37from kiln_ai.adapters.provider_tools import kiln_model_provider_from 38from kiln_ai.adapters.run_output import RunOutput 39from kiln_ai.datamodel import ( 40 DataSource, 41 DataSourceType, 42 MessageUsage, 43 Task, 44 TaskOutput, 45 TaskRun, 46 Usage, 47) 48from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType 49from kiln_ai.datamodel.json_schema import validate_schema_with_value_error 50from kiln_ai.datamodel.run_config import ( 51 KilnAgentRunConfigProperties, 52 as_kiln_agent_run_config, 53) 54from kiln_ai.datamodel.skill import Skill 55from kiln_ai.datamodel.task import RunConfigProperties 56from kiln_ai.datamodel.tool_id import SKILL_TOOL_ID_PREFIX, skill_id_from_tool_id 57 58# Import agent run context for run lifecycle management 59from kiln_ai.run_context import ( 60 clear_agent_run_id, 61 generate_agent_run_id, 62 get_agent_run_id, 63 set_agent_run_id, 64) 65from kiln_ai.tools import KilnToolInterface 66from kiln_ai.tools.mcp_session_manager import MCPSessionManager 67from kiln_ai.tools.skill_tool import SkillTool 68from kiln_ai.tools.tool_registry import tool_from_id 69from kiln_ai.utils.config import Config 70from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error 71from kiln_ai.utils.jinja_engine import render_input_transform 72from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam 73 74if TYPE_CHECKING: 75 from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream 76 77SkillsDict = Dict[str, Skill] 78 79 80@dataclass 81class AdapterConfig: 82 """ 83 An adapter config is config options that do NOT impact the output of the model. 84 85 For example: if it's saved, of if we request additional data like logprobs. 86 """ 87 88 allow_saving: bool = True 89 top_logprobs: int | None = None 90 default_tags: list[str] | None = None 91 92 """ 93 The ID of the TaskRunConfig that originated this run, if any. Stored on the 94 resulting TaskRun so the run can be traced back to its originating saved config 95 (in addition to the inline run_config snapshot). None for ad-hoc/inline runs 96 that were not initiated from a saved TaskRunConfig. 97 """ 98 task_run_config_id: str | None = None 99 100 """ 101 A custom prompt builder can be injected to override the system prompt building process. 102 If not provided, the prompt builder will be created from the run_config.prompt_id which 103 may load additional files from disk. 104 """ 105 prompt_builder: BasePromptBuilder | None = None 106 107 """ 108 Pre-loaded skills keyed by skill ID. When the run config references skills, 109 they are looked up from this dict instead of reading from the filesystem. 110 Use load_skills_for_task() to build this dict. 111 """ 112 skills: SkillsDict | None = None 113 114 """ 115 When True, the adapter will stop and return control to the caller when a tool call 116 is invoked, instead of processing tool calls internally. Default is False (process 117 tool calls internally). 118 """ 119 return_on_tool_call: bool = False 120 121 """ 122 Extra tools provided directly by the caller, in addition to tools resolved from the 123 task's tool registry. These are sent to the model together with registry tools, and 124 their names must not collide with registry tool names. 125 126 If ``return_on_tool_call`` is False (the default), the adapter executes these tools 127 itself just like registry tools. If True, the adapter returns as soon as the model 128 requests a tool call and the caller is responsible for running the tool and passing 129 results back via ``prior_trace``. 130 """ 131 unmanaged_tools: list[KilnToolInterface] | None = None 132 133 """ 134 When True, automatically inject prompt caching hints into completion 135 requests. This is a cost optimization and does not affect model output. 136 """ 137 automatic_prompt_caching: bool = False 138 139 140class BaseAdapter(metaclass=ABCMeta): 141 """Base class for AI model adapters that handle task execution. 142 143 This abstract class provides the foundation for implementing model-specific adapters 144 that can process tasks with structured or unstructured inputs/outputs. It handles 145 input/output validation, prompt building, and run tracking. 146 147 Prompt building is handled internally by the adapter, which uses a prompt builder 148 based on the run config. To override the prompt building behavior, pass a custom prompt 149 builder to the adapter config. 150 """ 151 152 def __init__( 153 self, 154 task: Task, 155 run_config: RunConfigProperties, 156 config: AdapterConfig | None = None, 157 ): 158 self.task = task 159 self.run_config: RunConfigProperties = run_config 160 self.base_adapter_config = config or AdapterConfig() 161 162 if isinstance(run_config, KilnAgentRunConfigProperties): 163 self.update_run_config_unknown_structured_output_mode() 164 self.prompt_builder = ( 165 self.base_adapter_config.prompt_builder 166 or prompt_builder_from_id(run_config.prompt_id, task) 167 ) 168 else: 169 self.prompt_builder = None 170 self._model_provider: KilnModelProvider | None = None 171 self._resolved_skills: list[Skill] | None = None 172 173 self.output_schema = task.output_json_schema 174 self.input_schema = task.input_json_schema 175 176 def model_provider(self) -> KilnModelProvider: 177 """ 178 Lazy load the model provider for this adapter. 179 """ 180 if self._model_provider is not None: 181 return self._model_provider 182 run_config = as_kiln_agent_run_config(self.run_config) 183 if not run_config.model_name or not run_config.model_provider_name: 184 raise ValueError("model_name and model_provider_name must be provided") 185 self._model_provider = kiln_model_provider_from( 186 run_config.model_name, run_config.model_provider_name 187 ) 188 if not self._model_provider: 189 raise ValueError( 190 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 191 ) 192 return self._model_provider 193 194 @staticmethod 195 def _normalize_prior_trace( 196 prior_trace: list[ChatCompletionMessageParam] | None, 197 ) -> list[ChatCompletionMessageParam] | None: 198 if not prior_trace: 199 return None 200 return prior_trace 201 202 def _reject_multiturn_with_structured_input( 203 self, 204 prior_trace: list[ChatCompletionMessageParam] | None, 205 ) -> None: 206 if prior_trace is not None and self.input_schema is not None: 207 raise ValueError( 208 "Cannot run multiturn execution with a task that has a structured input schema. " 209 "Use an unstructured task, or call without prior_trace." 210 ) 211 212 async def invoke( 213 self, 214 input: InputType, 215 input_source: DataSource | None = None, 216 prior_trace: list[ChatCompletionMessageParam] | None = None, 217 parent_task_run: TaskRun | None = None, 218 ) -> TaskRun: 219 task_run, _ = await self.invoke_returning_run_output( 220 input, input_source, prior_trace, parent_task_run 221 ) 222 return task_run 223 224 async def _run_returning_run_output( 225 self, 226 input: InputType, 227 input_source: DataSource | None = None, 228 prior_trace: list[ChatCompletionMessageParam] | None = None, 229 parent_task_run: TaskRun | None = None, 230 ) -> Tuple[TaskRun, RunOutput]: 231 # Pre-run validation: these checks run before any model call, so 232 # there is no trace to preserve. They stay outside the 233 # exception-wrapping block and surface as plain exceptions. 234 prior_trace = self._normalize_prior_trace(prior_trace) 235 self._reject_multiturn_with_structured_input(prior_trace) 236 237 if self.input_schema is not None: 238 validate_schema_with_value_error( 239 input, 240 self.input_schema, 241 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 242 require_object=False, 243 ) 244 245 # Apply input transform if configured. The original `input` is preserved 246 # for TaskRun.input persistence; `model_input` is what the model sees. 247 model_input = self._apply_input_transform(input) 248 249 # Format model input for model call (we save the original input in the 250 # task without formatting). This runs in the adapter but before any 251 # trace is built, so it also stays outside the wrapped region. 252 formatted_input = model_input 253 formatter_id = self.model_provider().formatter 254 if formatter_id is not None: 255 formatter = request_formatter_from_id(formatter_id) 256 formatted_input = formatter.format_input(model_input) 257 258 # Allocate the trace-so-far list here so the reference survives any 259 # exception thrown from inside `_run` (or the post-processing that 260 # follows). `_run` must mutate this list in place (extend/append, 261 # or `list[:] = ...`) and never rebind the local name. 262 trace_ref: list[ChatCompletionMessageParam] = [] 263 try: 264 run_output, usage = await self._run( 265 formatted_input, trace_ref, prior_trace=prior_trace 266 ) 267 268 if not run_output.is_toolcall_pending: 269 # Normal completion: parse and validate output 270 provider = self.model_provider() 271 parser = model_parser_from_id(provider.parser) 272 parsed_output = parser.parse_output(original_output=run_output) 273 274 # validate output 275 if self.output_schema is not None: 276 # Parse json to dict if we have structured output 277 if isinstance(parsed_output.output, str): 278 parsed_output.output = parse_json_string(parsed_output.output) 279 280 if not isinstance(parsed_output.output, dict): 281 raise RuntimeError( 282 f"structured response is not a dict: {parsed_output.output}" 283 ) 284 validate_schema_with_value_error( 285 parsed_output.output, 286 self.output_schema, 287 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 288 ) 289 else: 290 if not isinstance(parsed_output.output, str): 291 raise RuntimeError( 292 f"response is not a string for non-structured task: {parsed_output.output}" 293 ) 294 295 trace_has_toolcalls = parsed_output.trace is not None and any( 296 message.get("role", None) == "tool" 297 for message in parsed_output.trace 298 ) 299 300 # Validate reasoning content is present if required. 301 # Models often skip reasoning on the final turn when tools are involved, so we don't require it then. 302 if ( 303 provider.reasoning_capable 304 and ( 305 not parsed_output.intermediate_outputs 306 or "reasoning" not in parsed_output.intermediate_outputs 307 ) 308 and not ( 309 provider.reasoning_optional_for_structured_output 310 and self.has_structured_output() 311 ) 312 and not trace_has_toolcalls 313 ): 314 raise RuntimeError( 315 "Reasoning is required for this model, but no reasoning was returned." 316 ) 317 318 run_output = parsed_output 319 320 run = self.generate_run( 321 input, 322 input_source, 323 run_output, 324 usage, 325 run_output.trace, 326 parent_task_run, 327 ) 328 329 # Save the run if configured to do so, and we have a path to save to 330 if ( 331 self.base_adapter_config.allow_saving 332 and Config.shared().autosave_runs 333 and self.task.path is not None 334 ): 335 run.save_to_file() 336 else: 337 # Clear the ID to indicate it's not persisted 338 run.id = None 339 340 return run, run_output 341 except KilnRunError: 342 # Already wrapped — pass through so we don't double-wrap. 343 raise 344 except Exception as e: 345 # Trace conversion can itself throw (e.g., a malformed partial 346 # assistant message was appended to `messages` just before the 347 # real failure). Never let that swallow the original exception — 348 # fall back to no trace so the user still sees the real error. 349 partial_trace: list[ChatCompletionMessageParam] | None = None 350 if trace_ref: 351 try: 352 partial_trace = self._messages_to_trace(trace_ref) 353 except Exception: 354 partial_trace = None 355 raise KilnRunError( 356 message=format_error_message(e), 357 partial_trace=partial_trace, 358 original=e, 359 ) from e 360 361 async def invoke_returning_run_output( 362 self, 363 input: InputType, 364 input_source: DataSource | None = None, 365 prior_trace: list[ChatCompletionMessageParam] | None = None, 366 parent_task_run: TaskRun | None = None, 367 ) -> Tuple[TaskRun, RunOutput]: 368 # Determine if this is the root agent (no existing run context) 369 is_root_agent = get_agent_run_id() is None 370 371 if is_root_agent: 372 run_id = generate_agent_run_id() 373 set_agent_run_id(run_id) 374 375 try: 376 return await self._run_returning_run_output( 377 input, input_source, prior_trace, parent_task_run 378 ) 379 finally: 380 if is_root_agent: 381 try: 382 run_id = get_agent_run_id() 383 if run_id: 384 await MCPSessionManager.shared().cleanup_session(run_id) 385 finally: 386 clear_agent_run_id() 387 388 def invoke_openai_stream( 389 self, 390 input: InputType, 391 input_source: DataSource | None = None, 392 prior_trace: list[ChatCompletionMessageParam] | None = None, 393 parent_task_run: TaskRun | None = None, 394 ) -> OpenAIStreamResult: 395 """Stream raw OpenAI-protocol chunks for the task execution. 396 397 Returns an async-iterable that yields ``ModelResponseStream`` chunks 398 as they arrive from the model. After the iterator is exhausted the 399 run has been validated and saved (when configured). The resulting 400 ``TaskRun`` is available via the ``.task_run`` property. 401 402 Tool-call rounds happen internally and are not surfaced; use 403 ``invoke_ai_sdk_stream`` if you need tool-call events. 404 """ 405 return OpenAIStreamResult( 406 self, input, input_source, prior_trace, parent_task_run 407 ) 408 409 def invoke_ai_sdk_stream( 410 self, 411 input: InputType, 412 input_source: DataSource | None = None, 413 prior_trace: list[ChatCompletionMessageParam] | None = None, 414 parent_task_run: TaskRun | None = None, 415 ) -> AiSdkStreamResult: 416 """Stream AI SDK protocol events for the task execution. 417 418 Returns an async-iterable that yields ``AiSdkStreamEvent`` instances 419 covering text, reasoning, tool-call lifecycle, step boundaries, and 420 control events. After the iterator is exhausted the resulting 421 ``TaskRun`` is available via the ``.task_run`` property. 422 """ 423 return AiSdkStreamResult( 424 self, input, input_source, prior_trace, parent_task_run 425 ) 426 427 def _prepare_stream( 428 self, 429 input: InputType, 430 prior_trace: list[ChatCompletionMessageParam] | None, 431 ) -> AdapterStream: 432 prior_trace = self._normalize_prior_trace(prior_trace) 433 self._reject_multiturn_with_structured_input(prior_trace) 434 435 if self.input_schema is not None: 436 validate_schema_with_value_error( 437 input, 438 self.input_schema, 439 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 440 require_object=False, 441 ) 442 443 model_input = self._apply_input_transform(input) 444 445 formatted_input = model_input 446 formatter_id = self.model_provider().formatter 447 if formatter_id is not None: 448 formatter = request_formatter_from_id(formatter_id) 449 formatted_input = formatter.format_input(model_input) 450 451 return self._create_run_stream(formatted_input, prior_trace) 452 453 def _finalize_stream( 454 self, 455 adapter_stream: AdapterStream, 456 input: InputType, 457 input_source: DataSource | None, 458 parent_task_run: TaskRun | None = None, 459 ) -> TaskRun: 460 """Streaming invocations are only concerned with passing through events as they come in. 461 At the end of the stream, we still need to validate the output, create a run and everything 462 else that a non-streaming invocation would do. 463 """ 464 465 result: AdapterStreamResult = adapter_stream.result 466 run_output = result.run_output 467 usage = result.usage 468 469 if not run_output.is_toolcall_pending: 470 # Normal completion: parse and validate output 471 provider = self.model_provider() 472 parser = model_parser_from_id(provider.parser) 473 parsed_output = parser.parse_output(original_output=run_output) 474 475 if self.output_schema is not None: 476 if isinstance(parsed_output.output, str): 477 parsed_output.output = parse_json_string(parsed_output.output) 478 if not isinstance(parsed_output.output, dict): 479 raise RuntimeError( 480 f"structured response is not a dict: {parsed_output.output}" 481 ) 482 validate_schema_with_value_error( 483 parsed_output.output, 484 self.output_schema, 485 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 486 ) 487 else: 488 if not isinstance(parsed_output.output, str): 489 raise RuntimeError( 490 f"response is not a string for non-structured task: {parsed_output.output}" 491 ) 492 493 trace_has_toolcalls = parsed_output.trace is not None and any( 494 message.get("role", None) == "tool" for message in parsed_output.trace 495 ) 496 if ( 497 provider.reasoning_capable 498 and ( 499 not parsed_output.intermediate_outputs 500 or "reasoning" not in parsed_output.intermediate_outputs 501 ) 502 and not ( 503 provider.reasoning_optional_for_structured_output 504 and self.has_structured_output() 505 ) 506 and not trace_has_toolcalls 507 ): 508 raise RuntimeError( 509 "Reasoning is required for this model, but no reasoning was returned." 510 ) 511 512 run_output = parsed_output 513 514 run = self.generate_run( 515 input, input_source, run_output, usage, run_output.trace, parent_task_run 516 ) 517 518 if ( 519 self.base_adapter_config.allow_saving 520 and Config.shared().autosave_runs 521 and self.task.path is not None 522 ): 523 run.save_to_file() 524 else: 525 run.id = None 526 527 return run 528 529 def _apply_input_transform(self, input: InputType) -> InputType: 530 """If the run config has an input_transform, render it and return the 531 resulting string. Otherwise return input unchanged. 532 533 MCP run configs (no input_transform field) are a no-op. 534 """ 535 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 536 return input 537 transform = self.run_config.input_transform 538 if transform is None: 539 return input 540 try: 541 return render_input_transform(transform, input) 542 except Exception as e: 543 raise ValueError(f"Input transform failed: {e}") from e 544 545 def has_structured_output(self) -> bool: 546 return self.output_schema is not None 547 548 @abstractmethod 549 def adapter_name(self) -> str: 550 pass 551 552 @abstractmethod 553 async def _run( 554 self, 555 input: InputType, 556 trace_ref: list[ChatCompletionMessageParam], 557 prior_trace: list[ChatCompletionMessageParam] | None = None, 558 ) -> Tuple[RunOutput, Usage | None]: 559 """Run the model. Implementations MUST mutate `trace_ref` in place 560 (extend/append, or `trace_ref[:] = ...`) — never rebind it — so the 561 caller keeps a live reference to the partial trace if an exception 562 escapes. 563 """ 564 pass 565 566 def _messages_to_trace( 567 self, 568 messages: list[ChatCompletionMessageParam], 569 ) -> list[ChatCompletionMessageParam]: 570 """Convert the adapter's internal `messages` list to an API-safe trace. 571 572 Default implementation returns the list as-is. Adapters that store 573 internal message objects (e.g. LiteLLM's `Message`) should override 574 this to normalize to `ChatCompletionMessageParam` shapes. 575 """ 576 return messages 577 578 def _create_run_stream( 579 self, 580 input: InputType, 581 prior_trace: list[ChatCompletionMessageParam] | None = None, 582 ) -> AdapterStream: 583 """Create a stream for the adapter. Implementations must override this method to support streaming.""" 584 raise NotImplementedError("Streaming is not supported for this adapter type") 585 586 def build_prompt(self) -> str: 587 if self.prompt_builder is None: 588 raise ValueError("Prompt builder is not available for MCP run config") 589 # The prompt builder needs to know if we want to inject formatting instructions 590 structured_output_mode = as_kiln_agent_run_config( 591 self.run_config 592 ).structured_output_mode 593 add_json_instructions = self.has_structured_output() and ( 594 structured_output_mode == StructuredOutputMode.json_instructions 595 or structured_output_mode 596 == StructuredOutputMode.json_instruction_and_object 597 ) 598 599 return self.prompt_builder.build_prompt( 600 include_json_instructions=add_json_instructions, 601 skills=self._resolve_skills(), 602 ) 603 604 def _resolve_skills(self) -> list[Skill]: 605 """Resolve skills from the injected skills dict. 606 607 Uses the pre-loaded skills dict from AdapterConfig. Caches the result 608 so that build_prompt and available_tools don't repeat 609 the lookup. Raises ValueError if the run config references a skill 610 that is not in the injected dict. 611 """ 612 if self._resolved_skills is not None: 613 return self._resolved_skills 614 615 if self.run_config.type != "kiln_agent": 616 self._resolved_skills = [] 617 return self._resolved_skills 618 619 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 620 if tool_config is None or tool_config.tools is None: 621 self._resolved_skills = [] 622 return self._resolved_skills 623 624 skill_tool_ids = [ 625 tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX) 626 ] 627 if not skill_tool_ids: 628 self._resolved_skills = [] 629 return self._resolved_skills 630 631 injected = self.base_adapter_config.skills 632 if injected is None: 633 raise ValueError( 634 "Run config references skills but no skills dict was provided via " 635 "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load " 636 "skills and pass them to the adapter." 637 ) 638 639 skills: list[Skill] = [] 640 seen: set[str] = set() 641 for tool_id in skill_tool_ids: 642 sid = skill_id_from_tool_id(tool_id) 643 if sid not in injected: 644 raise ValueError( 645 f"Skill {sid} referenced in run config but not found in the " 646 "injected skills dict." 647 ) 648 if sid in seen: 649 continue 650 seen.add(sid) 651 skills.append(injected[sid]) 652 653 self._resolved_skills = skills 654 return self._resolved_skills 655 656 def build_chat_formatter( 657 self, 658 input: InputType, 659 prior_trace: list[ChatCompletionMessageParam] | None = None, 660 ) -> ChatFormatter: 661 prior_trace = self._normalize_prior_trace(prior_trace) 662 self._reject_multiturn_with_structured_input(prior_trace) 663 if prior_trace is not None: 664 return MultiturnFormatter(prior_trace, input) 665 if self.prompt_builder is None: 666 raise ValueError("Prompt builder is not available for MCP run config") 667 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 668 669 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 670 system_message = self.build_prompt() 671 672 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 673 if not cot_prompt: 674 return get_chat_formatter( 675 strategy=ChatStrategy.single_turn, 676 system_message=system_message, 677 user_input=input, 678 ) 679 680 # Some models like finetunes are trained with a specific chat strategy. Use that. 681 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 682 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 683 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 684 return get_chat_formatter( 685 strategy=tuned_chat_strategy, 686 system_message=system_message, 687 user_input=input, 688 thinking_instructions=cot_prompt, 689 ) 690 691 # Pick the best chat strategy for the model given it has a cot prompt. 692 reasoning_capable = self.model_provider().reasoning_capable 693 if reasoning_capable: 694 # "Thinking" LLM designed to output thinking in a structured format. We'll use its native format. 695 # A simple message with the COT prompt appended to the message list is sufficient 696 return get_chat_formatter( 697 strategy=ChatStrategy.single_turn_r1_thinking, 698 system_message=system_message, 699 user_input=input, 700 thinking_instructions=cot_prompt, 701 ) 702 else: 703 # Unstructured output with COT 704 # Two calls to separate the thinking from the final response 705 return get_chat_formatter( 706 strategy=ChatStrategy.two_message_cot, 707 system_message=system_message, 708 user_input=input, 709 thinking_instructions=cot_prompt, 710 ) 711 712 # create a run and task output 713 def generate_run( 714 self, 715 input: InputType, 716 input_source: DataSource | None, 717 run_output: RunOutput, 718 usage: Usage | None = None, 719 trace: list[ChatCompletionMessageParam] | None = None, 720 parent_task_run: TaskRun | None = None, 721 ) -> TaskRun: 722 output_str = ( 723 json.dumps(run_output.output, ensure_ascii=False) 724 if isinstance(run_output.output, dict) 725 else run_output.output 726 ) 727 728 output_source_type = ( 729 DataSourceType.tool_call 730 if self.run_config.type == "mcp" 731 else DataSourceType.synthetic 732 ) 733 734 new_output = TaskOutput( 735 output=output_str, 736 source=DataSource( 737 type=output_source_type, 738 properties=self._properties_for_task_output(), 739 run_config_id=self.base_adapter_config.task_run_config_id, 740 run_config=self.run_config, 741 ), 742 ) 743 744 # Convert input and output to JSON strings if they aren't strings 745 input_str = ( 746 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 747 ) 748 749 if input_source is None: 750 input_source = DataSource( 751 type=DataSourceType.human, 752 properties={"created_by": Config.shared().user_id}, 753 ) 754 755 parent_task_run_id: str | None = None 756 if parent_task_run is not None: 757 if parent_task_run.id is None: 758 raise ValueError( 759 "parent_task_run must be persisted before using as parent: save the parent " 760 "TaskRun (e.g. save_to_file()) so it has a stable id." 761 ) 762 parent_task_run_id = parent_task_run.id 763 764 return TaskRun( 765 parent=self.task, 766 parent_task_run_id=parent_task_run_id, 767 input=input_str, 768 input_source=input_source, 769 output=new_output, 770 intermediate_outputs=run_output.intermediate_outputs, 771 tags=self.base_adapter_config.default_tags or [], 772 usage=usage, 773 trace=trace, 774 cumulative_usage=MessageUsage.from_trace(trace), 775 ) 776 777 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 778 match self.run_config.type: 779 case "mcp": 780 return {} 781 case "kiln_agent": 782 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 783 raise ValueError("Kiln agent run config is required") 784 run_config = self.run_config 785 786 props: Dict[str, str | int | float] = {} 787 props["adapter_name"] = self.adapter_name() 788 # Legacy properties where we save the run_config details into custom properties. 789 # These are now also be saved in the run_config field. 790 props["model_name"] = run_config.model_name 791 props["model_provider"] = run_config.model_provider_name 792 props["prompt_id"] = run_config.prompt_id 793 props["structured_output_mode"] = run_config.structured_output_mode 794 props["temperature"] = run_config.temperature 795 props["top_p"] = run_config.top_p 796 797 return props 798 case _: 799 raise_exhaustive_enum_error(self.run_config.type) 800 801 def update_run_config_unknown_structured_output_mode(self) -> None: 802 if self.run_config.type != "kiln_agent": 803 return 804 run_config = as_kiln_agent_run_config(self.run_config) 805 structured_output_mode = run_config.structured_output_mode 806 807 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 808 # Look up our recommended mode from ml_model_list if we have one 809 if structured_output_mode == StructuredOutputMode.unknown: 810 new_run_config = run_config.model_copy(deep=True) 811 structured_output_mode = default_structured_output_mode_for_model_provider( 812 run_config.model_name, 813 run_config.model_provider_name, 814 ) 815 new_run_config.structured_output_mode = structured_output_mode 816 self.run_config = new_run_config 817 818 async def available_tools(self) -> list[KilnToolInterface]: 819 if self.run_config.type != "kiln_agent": 820 return [] 821 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 822 if tool_config is None or tool_config.tools is None: 823 return [] 824 825 non_skill_tool_ids = [ 826 tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX) 827 ] 828 829 tools: list[KilnToolInterface] = [ 830 tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids 831 ] 832 833 skills = self._resolve_skills() 834 if skills: 835 seen_names: set[str] = set() 836 for skill in skills: 837 if skill.name in seen_names: 838 raise ValueError( 839 f"Duplicate skill name '{skill.name}'. Each skill must have a unique name." 840 ) 841 seen_names.add(skill.name) 842 tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills)) 843 844 tool_names = [await tool.name() for tool in tools] 845 if len(tool_names) != len(set(tool_names)): 846 raise ValueError( 847 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 848 ) 849 850 return tools 851 852 853class OpenAIStreamResult: 854 """Async-iterable wrapper around the OpenAI streaming flow. 855 856 Yields ``ModelResponseStream`` chunks. After iteration the resulting 857 ``TaskRun`` is available via the ``.task_run`` property. 858 859 When return_on_tool_call=True and the model requests tool calls, the stream 860 will stop and ``task_run.is_toolcall_pending`` will be True. 861 """ 862 863 def __init__( 864 self, 865 adapter: BaseAdapter, 866 input: InputType, 867 input_source: DataSource | None, 868 prior_trace: list[ChatCompletionMessageParam] | None, 869 parent_task_run: TaskRun | None = None, 870 ) -> None: 871 self._adapter = adapter 872 self._input = input 873 self._input_source = input_source 874 self._prior_trace = prior_trace 875 self._parent_task_run = parent_task_run 876 self._task_run: TaskRun | None = None 877 878 @property 879 def task_run(self) -> TaskRun: 880 if self._task_run is None: 881 raise RuntimeError( 882 "Stream has not been fully consumed yet. " 883 "Iterate over the stream before accessing .task_run" 884 ) 885 return self._task_run 886 887 async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: 888 self._task_run = None 889 is_root_agent = get_agent_run_id() is None 890 if is_root_agent: 891 set_agent_run_id(generate_agent_run_id()) 892 893 try: 894 adapter_stream = self._adapter._prepare_stream( 895 self._input, self._prior_trace 896 ) 897 898 async for event in adapter_stream: 899 if isinstance(event, ModelResponseStream): 900 yield event 901 902 self._task_run = self._adapter._finalize_stream( 903 adapter_stream, self._input, self._input_source, self._parent_task_run 904 ) 905 finally: 906 if is_root_agent: 907 try: 908 run_id = get_agent_run_id() 909 if run_id: 910 await MCPSessionManager.shared().cleanup_session(run_id) 911 finally: 912 clear_agent_run_id() 913 914 915class AiSdkStreamResult: 916 """Async-iterable wrapper around the AI SDK streaming flow. 917 918 Yields ``AiSdkStreamEvent`` instances. After iteration the resulting 919 ``TaskRun`` is available via the ``.task_run`` property. 920 921 When return_on_tool_call=True and the model requests tool calls, the FINISH 922 event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending`` 923 will be True. 924 """ 925 926 def __init__( 927 self, 928 adapter: BaseAdapter, 929 input: InputType, 930 input_source: DataSource | None, 931 prior_trace: list[ChatCompletionMessageParam] | None, 932 parent_task_run: TaskRun | None = None, 933 ) -> None: 934 self._adapter = adapter 935 self._input = input 936 self._input_source = input_source 937 self._prior_trace = prior_trace 938 self._parent_task_run = parent_task_run 939 self._task_run: TaskRun | None = None 940 941 @property 942 def task_run(self) -> TaskRun: 943 if self._task_run is None: 944 raise RuntimeError( 945 "Stream has not been fully consumed yet. " 946 "Iterate over the stream before accessing .task_run" 947 ) 948 return self._task_run 949 950 async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]: 951 self._task_run = None 952 is_root_agent = get_agent_run_id() is None 953 if is_root_agent: 954 set_agent_run_id(generate_agent_run_id()) 955 956 try: 957 adapter_stream = self._adapter._prepare_stream( 958 self._input, self._prior_trace 959 ) 960 961 message_id = f"msg-{uuid.uuid4().hex}" 962 converter = AiSdkStreamConverter() 963 964 yield StartEvent(messageId=message_id) 965 yield StartStepEvent() 966 967 last_event_was_tool_call = False 968 async for event in adapter_stream: 969 if isinstance(event, ModelResponseStream): 970 if last_event_was_tool_call: 971 converter.reset_for_next_step() 972 last_event_was_tool_call = False 973 for ai_event in converter.convert_chunk(event): 974 yield ai_event 975 elif isinstance(event, ToolCallEvent): 976 last_event_was_tool_call = True 977 for ai_event in converter.convert_tool_event(event): 978 yield ai_event 979 980 for ai_event in converter.close_open_blocks(): 981 yield ai_event 982 983 yield FinishStepEvent() 984 985 self._task_run = self._adapter._finalize_stream( 986 adapter_stream, self._input, self._input_source, self._parent_task_run 987 ) 988 989 if self._task_run.is_toolcall_pending: 990 yield FinishEvent( 991 messageMetadata=FinishMessageMetadata(finishReason="tool-calls"), 992 ) 993 else: 994 for ai_event in converter.finalize(): 995 yield ai_event 996 finally: 997 if is_root_agent: 998 try: 999 run_id = get_agent_run_id() 1000 if run_id: 1001 await MCPSessionManager.shared().cleanup_session(run_id) 1002 finally: 1003 clear_agent_run_id()
81@dataclass 82class AdapterConfig: 83 """ 84 An adapter config is config options that do NOT impact the output of the model. 85 86 For example: if it's saved, of if we request additional data like logprobs. 87 """ 88 89 allow_saving: bool = True 90 top_logprobs: int | None = None 91 default_tags: list[str] | None = None 92 93 """ 94 The ID of the TaskRunConfig that originated this run, if any. Stored on the 95 resulting TaskRun so the run can be traced back to its originating saved config 96 (in addition to the inline run_config snapshot). None for ad-hoc/inline runs 97 that were not initiated from a saved TaskRunConfig. 98 """ 99 task_run_config_id: str | None = None 100 101 """ 102 A custom prompt builder can be injected to override the system prompt building process. 103 If not provided, the prompt builder will be created from the run_config.prompt_id which 104 may load additional files from disk. 105 """ 106 prompt_builder: BasePromptBuilder | None = None 107 108 """ 109 Pre-loaded skills keyed by skill ID. When the run config references skills, 110 they are looked up from this dict instead of reading from the filesystem. 111 Use load_skills_for_task() to build this dict. 112 """ 113 skills: SkillsDict | None = None 114 115 """ 116 When True, the adapter will stop and return control to the caller when a tool call 117 is invoked, instead of processing tool calls internally. Default is False (process 118 tool calls internally). 119 """ 120 return_on_tool_call: bool = False 121 122 """ 123 Extra tools provided directly by the caller, in addition to tools resolved from the 124 task's tool registry. These are sent to the model together with registry tools, and 125 their names must not collide with registry tool names. 126 127 If ``return_on_tool_call`` is False (the default), the adapter executes these tools 128 itself just like registry tools. If True, the adapter returns as soon as the model 129 requests a tool call and the caller is responsible for running the tool and passing 130 results back via ``prior_trace``. 131 """ 132 unmanaged_tools: list[KilnToolInterface] | None = None 133 134 """ 135 When True, automatically inject prompt caching hints into completion 136 requests. This is a cost optimization and does not affect model output. 137 """ 138 automatic_prompt_caching: bool = False
An adapter config is config options that do NOT impact the output of the model.
For example: if it's saved, of if we request additional data like logprobs.
A custom prompt builder can be injected to override the system prompt building process. If not provided, the prompt builder will be created from the run_config.prompt_id which may load additional files from disk.
Pre-loaded skills keyed by skill ID. When the run config references skills, they are looked up from this dict instead of reading from the filesystem. Use load_skills_for_task() to build this dict.
When True, the adapter will stop and return control to the caller when a tool call is invoked, instead of processing tool calls internally. Default is False (process tool calls internally).
Extra tools provided directly by the caller, in addition to tools resolved from the task's tool registry. These are sent to the model together with registry tools, and their names must not collide with registry tool names.
If return_on_tool_call is False (the default), the adapter executes these tools
itself just like registry tools. If True, the adapter returns as soon as the model
requests a tool call and the caller is responsible for running the tool and passing
results back via prior_trace.
141class BaseAdapter(metaclass=ABCMeta): 142 """Base class for AI model adapters that handle task execution. 143 144 This abstract class provides the foundation for implementing model-specific adapters 145 that can process tasks with structured or unstructured inputs/outputs. It handles 146 input/output validation, prompt building, and run tracking. 147 148 Prompt building is handled internally by the adapter, which uses a prompt builder 149 based on the run config. To override the prompt building behavior, pass a custom prompt 150 builder to the adapter config. 151 """ 152 153 def __init__( 154 self, 155 task: Task, 156 run_config: RunConfigProperties, 157 config: AdapterConfig | None = None, 158 ): 159 self.task = task 160 self.run_config: RunConfigProperties = run_config 161 self.base_adapter_config = config or AdapterConfig() 162 163 if isinstance(run_config, KilnAgentRunConfigProperties): 164 self.update_run_config_unknown_structured_output_mode() 165 self.prompt_builder = ( 166 self.base_adapter_config.prompt_builder 167 or prompt_builder_from_id(run_config.prompt_id, task) 168 ) 169 else: 170 self.prompt_builder = None 171 self._model_provider: KilnModelProvider | None = None 172 self._resolved_skills: list[Skill] | None = None 173 174 self.output_schema = task.output_json_schema 175 self.input_schema = task.input_json_schema 176 177 def model_provider(self) -> KilnModelProvider: 178 """ 179 Lazy load the model provider for this adapter. 180 """ 181 if self._model_provider is not None: 182 return self._model_provider 183 run_config = as_kiln_agent_run_config(self.run_config) 184 if not run_config.model_name or not run_config.model_provider_name: 185 raise ValueError("model_name and model_provider_name must be provided") 186 self._model_provider = kiln_model_provider_from( 187 run_config.model_name, run_config.model_provider_name 188 ) 189 if not self._model_provider: 190 raise ValueError( 191 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 192 ) 193 return self._model_provider 194 195 @staticmethod 196 def _normalize_prior_trace( 197 prior_trace: list[ChatCompletionMessageParam] | None, 198 ) -> list[ChatCompletionMessageParam] | None: 199 if not prior_trace: 200 return None 201 return prior_trace 202 203 def _reject_multiturn_with_structured_input( 204 self, 205 prior_trace: list[ChatCompletionMessageParam] | None, 206 ) -> None: 207 if prior_trace is not None and self.input_schema is not None: 208 raise ValueError( 209 "Cannot run multiturn execution with a task that has a structured input schema. " 210 "Use an unstructured task, or call without prior_trace." 211 ) 212 213 async def invoke( 214 self, 215 input: InputType, 216 input_source: DataSource | None = None, 217 prior_trace: list[ChatCompletionMessageParam] | None = None, 218 parent_task_run: TaskRun | None = None, 219 ) -> TaskRun: 220 task_run, _ = await self.invoke_returning_run_output( 221 input, input_source, prior_trace, parent_task_run 222 ) 223 return task_run 224 225 async def _run_returning_run_output( 226 self, 227 input: InputType, 228 input_source: DataSource | None = None, 229 prior_trace: list[ChatCompletionMessageParam] | None = None, 230 parent_task_run: TaskRun | None = None, 231 ) -> Tuple[TaskRun, RunOutput]: 232 # Pre-run validation: these checks run before any model call, so 233 # there is no trace to preserve. They stay outside the 234 # exception-wrapping block and surface as plain exceptions. 235 prior_trace = self._normalize_prior_trace(prior_trace) 236 self._reject_multiturn_with_structured_input(prior_trace) 237 238 if self.input_schema is not None: 239 validate_schema_with_value_error( 240 input, 241 self.input_schema, 242 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 243 require_object=False, 244 ) 245 246 # Apply input transform if configured. The original `input` is preserved 247 # for TaskRun.input persistence; `model_input` is what the model sees. 248 model_input = self._apply_input_transform(input) 249 250 # Format model input for model call (we save the original input in the 251 # task without formatting). This runs in the adapter but before any 252 # trace is built, so it also stays outside the wrapped region. 253 formatted_input = model_input 254 formatter_id = self.model_provider().formatter 255 if formatter_id is not None: 256 formatter = request_formatter_from_id(formatter_id) 257 formatted_input = formatter.format_input(model_input) 258 259 # Allocate the trace-so-far list here so the reference survives any 260 # exception thrown from inside `_run` (or the post-processing that 261 # follows). `_run` must mutate this list in place (extend/append, 262 # or `list[:] = ...`) and never rebind the local name. 263 trace_ref: list[ChatCompletionMessageParam] = [] 264 try: 265 run_output, usage = await self._run( 266 formatted_input, trace_ref, prior_trace=prior_trace 267 ) 268 269 if not run_output.is_toolcall_pending: 270 # Normal completion: parse and validate output 271 provider = self.model_provider() 272 parser = model_parser_from_id(provider.parser) 273 parsed_output = parser.parse_output(original_output=run_output) 274 275 # validate output 276 if self.output_schema is not None: 277 # Parse json to dict if we have structured output 278 if isinstance(parsed_output.output, str): 279 parsed_output.output = parse_json_string(parsed_output.output) 280 281 if not isinstance(parsed_output.output, dict): 282 raise RuntimeError( 283 f"structured response is not a dict: {parsed_output.output}" 284 ) 285 validate_schema_with_value_error( 286 parsed_output.output, 287 self.output_schema, 288 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 289 ) 290 else: 291 if not isinstance(parsed_output.output, str): 292 raise RuntimeError( 293 f"response is not a string for non-structured task: {parsed_output.output}" 294 ) 295 296 trace_has_toolcalls = parsed_output.trace is not None and any( 297 message.get("role", None) == "tool" 298 for message in parsed_output.trace 299 ) 300 301 # Validate reasoning content is present if required. 302 # Models often skip reasoning on the final turn when tools are involved, so we don't require it then. 303 if ( 304 provider.reasoning_capable 305 and ( 306 not parsed_output.intermediate_outputs 307 or "reasoning" not in parsed_output.intermediate_outputs 308 ) 309 and not ( 310 provider.reasoning_optional_for_structured_output 311 and self.has_structured_output() 312 ) 313 and not trace_has_toolcalls 314 ): 315 raise RuntimeError( 316 "Reasoning is required for this model, but no reasoning was returned." 317 ) 318 319 run_output = parsed_output 320 321 run = self.generate_run( 322 input, 323 input_source, 324 run_output, 325 usage, 326 run_output.trace, 327 parent_task_run, 328 ) 329 330 # Save the run if configured to do so, and we have a path to save to 331 if ( 332 self.base_adapter_config.allow_saving 333 and Config.shared().autosave_runs 334 and self.task.path is not None 335 ): 336 run.save_to_file() 337 else: 338 # Clear the ID to indicate it's not persisted 339 run.id = None 340 341 return run, run_output 342 except KilnRunError: 343 # Already wrapped — pass through so we don't double-wrap. 344 raise 345 except Exception as e: 346 # Trace conversion can itself throw (e.g., a malformed partial 347 # assistant message was appended to `messages` just before the 348 # real failure). Never let that swallow the original exception — 349 # fall back to no trace so the user still sees the real error. 350 partial_trace: list[ChatCompletionMessageParam] | None = None 351 if trace_ref: 352 try: 353 partial_trace = self._messages_to_trace(trace_ref) 354 except Exception: 355 partial_trace = None 356 raise KilnRunError( 357 message=format_error_message(e), 358 partial_trace=partial_trace, 359 original=e, 360 ) from e 361 362 async def invoke_returning_run_output( 363 self, 364 input: InputType, 365 input_source: DataSource | None = None, 366 prior_trace: list[ChatCompletionMessageParam] | None = None, 367 parent_task_run: TaskRun | None = None, 368 ) -> Tuple[TaskRun, RunOutput]: 369 # Determine if this is the root agent (no existing run context) 370 is_root_agent = get_agent_run_id() is None 371 372 if is_root_agent: 373 run_id = generate_agent_run_id() 374 set_agent_run_id(run_id) 375 376 try: 377 return await self._run_returning_run_output( 378 input, input_source, prior_trace, parent_task_run 379 ) 380 finally: 381 if is_root_agent: 382 try: 383 run_id = get_agent_run_id() 384 if run_id: 385 await MCPSessionManager.shared().cleanup_session(run_id) 386 finally: 387 clear_agent_run_id() 388 389 def invoke_openai_stream( 390 self, 391 input: InputType, 392 input_source: DataSource | None = None, 393 prior_trace: list[ChatCompletionMessageParam] | None = None, 394 parent_task_run: TaskRun | None = None, 395 ) -> OpenAIStreamResult: 396 """Stream raw OpenAI-protocol chunks for the task execution. 397 398 Returns an async-iterable that yields ``ModelResponseStream`` chunks 399 as they arrive from the model. After the iterator is exhausted the 400 run has been validated and saved (when configured). The resulting 401 ``TaskRun`` is available via the ``.task_run`` property. 402 403 Tool-call rounds happen internally and are not surfaced; use 404 ``invoke_ai_sdk_stream`` if you need tool-call events. 405 """ 406 return OpenAIStreamResult( 407 self, input, input_source, prior_trace, parent_task_run 408 ) 409 410 def invoke_ai_sdk_stream( 411 self, 412 input: InputType, 413 input_source: DataSource | None = None, 414 prior_trace: list[ChatCompletionMessageParam] | None = None, 415 parent_task_run: TaskRun | None = None, 416 ) -> AiSdkStreamResult: 417 """Stream AI SDK protocol events for the task execution. 418 419 Returns an async-iterable that yields ``AiSdkStreamEvent`` instances 420 covering text, reasoning, tool-call lifecycle, step boundaries, and 421 control events. After the iterator is exhausted the resulting 422 ``TaskRun`` is available via the ``.task_run`` property. 423 """ 424 return AiSdkStreamResult( 425 self, input, input_source, prior_trace, parent_task_run 426 ) 427 428 def _prepare_stream( 429 self, 430 input: InputType, 431 prior_trace: list[ChatCompletionMessageParam] | None, 432 ) -> AdapterStream: 433 prior_trace = self._normalize_prior_trace(prior_trace) 434 self._reject_multiturn_with_structured_input(prior_trace) 435 436 if self.input_schema is not None: 437 validate_schema_with_value_error( 438 input, 439 self.input_schema, 440 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 441 require_object=False, 442 ) 443 444 model_input = self._apply_input_transform(input) 445 446 formatted_input = model_input 447 formatter_id = self.model_provider().formatter 448 if formatter_id is not None: 449 formatter = request_formatter_from_id(formatter_id) 450 formatted_input = formatter.format_input(model_input) 451 452 return self._create_run_stream(formatted_input, prior_trace) 453 454 def _finalize_stream( 455 self, 456 adapter_stream: AdapterStream, 457 input: InputType, 458 input_source: DataSource | None, 459 parent_task_run: TaskRun | None = None, 460 ) -> TaskRun: 461 """Streaming invocations are only concerned with passing through events as they come in. 462 At the end of the stream, we still need to validate the output, create a run and everything 463 else that a non-streaming invocation would do. 464 """ 465 466 result: AdapterStreamResult = adapter_stream.result 467 run_output = result.run_output 468 usage = result.usage 469 470 if not run_output.is_toolcall_pending: 471 # Normal completion: parse and validate output 472 provider = self.model_provider() 473 parser = model_parser_from_id(provider.parser) 474 parsed_output = parser.parse_output(original_output=run_output) 475 476 if self.output_schema is not None: 477 if isinstance(parsed_output.output, str): 478 parsed_output.output = parse_json_string(parsed_output.output) 479 if not isinstance(parsed_output.output, dict): 480 raise RuntimeError( 481 f"structured response is not a dict: {parsed_output.output}" 482 ) 483 validate_schema_with_value_error( 484 parsed_output.output, 485 self.output_schema, 486 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 487 ) 488 else: 489 if not isinstance(parsed_output.output, str): 490 raise RuntimeError( 491 f"response is not a string for non-structured task: {parsed_output.output}" 492 ) 493 494 trace_has_toolcalls = parsed_output.trace is not None and any( 495 message.get("role", None) == "tool" for message in parsed_output.trace 496 ) 497 if ( 498 provider.reasoning_capable 499 and ( 500 not parsed_output.intermediate_outputs 501 or "reasoning" not in parsed_output.intermediate_outputs 502 ) 503 and not ( 504 provider.reasoning_optional_for_structured_output 505 and self.has_structured_output() 506 ) 507 and not trace_has_toolcalls 508 ): 509 raise RuntimeError( 510 "Reasoning is required for this model, but no reasoning was returned." 511 ) 512 513 run_output = parsed_output 514 515 run = self.generate_run( 516 input, input_source, run_output, usage, run_output.trace, parent_task_run 517 ) 518 519 if ( 520 self.base_adapter_config.allow_saving 521 and Config.shared().autosave_runs 522 and self.task.path is not None 523 ): 524 run.save_to_file() 525 else: 526 run.id = None 527 528 return run 529 530 def _apply_input_transform(self, input: InputType) -> InputType: 531 """If the run config has an input_transform, render it and return the 532 resulting string. Otherwise return input unchanged. 533 534 MCP run configs (no input_transform field) are a no-op. 535 """ 536 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 537 return input 538 transform = self.run_config.input_transform 539 if transform is None: 540 return input 541 try: 542 return render_input_transform(transform, input) 543 except Exception as e: 544 raise ValueError(f"Input transform failed: {e}") from e 545 546 def has_structured_output(self) -> bool: 547 return self.output_schema is not None 548 549 @abstractmethod 550 def adapter_name(self) -> str: 551 pass 552 553 @abstractmethod 554 async def _run( 555 self, 556 input: InputType, 557 trace_ref: list[ChatCompletionMessageParam], 558 prior_trace: list[ChatCompletionMessageParam] | None = None, 559 ) -> Tuple[RunOutput, Usage | None]: 560 """Run the model. Implementations MUST mutate `trace_ref` in place 561 (extend/append, or `trace_ref[:] = ...`) — never rebind it — so the 562 caller keeps a live reference to the partial trace if an exception 563 escapes. 564 """ 565 pass 566 567 def _messages_to_trace( 568 self, 569 messages: list[ChatCompletionMessageParam], 570 ) -> list[ChatCompletionMessageParam]: 571 """Convert the adapter's internal `messages` list to an API-safe trace. 572 573 Default implementation returns the list as-is. Adapters that store 574 internal message objects (e.g. LiteLLM's `Message`) should override 575 this to normalize to `ChatCompletionMessageParam` shapes. 576 """ 577 return messages 578 579 def _create_run_stream( 580 self, 581 input: InputType, 582 prior_trace: list[ChatCompletionMessageParam] | None = None, 583 ) -> AdapterStream: 584 """Create a stream for the adapter. Implementations must override this method to support streaming.""" 585 raise NotImplementedError("Streaming is not supported for this adapter type") 586 587 def build_prompt(self) -> str: 588 if self.prompt_builder is None: 589 raise ValueError("Prompt builder is not available for MCP run config") 590 # The prompt builder needs to know if we want to inject formatting instructions 591 structured_output_mode = as_kiln_agent_run_config( 592 self.run_config 593 ).structured_output_mode 594 add_json_instructions = self.has_structured_output() and ( 595 structured_output_mode == StructuredOutputMode.json_instructions 596 or structured_output_mode 597 == StructuredOutputMode.json_instruction_and_object 598 ) 599 600 return self.prompt_builder.build_prompt( 601 include_json_instructions=add_json_instructions, 602 skills=self._resolve_skills(), 603 ) 604 605 def _resolve_skills(self) -> list[Skill]: 606 """Resolve skills from the injected skills dict. 607 608 Uses the pre-loaded skills dict from AdapterConfig. Caches the result 609 so that build_prompt and available_tools don't repeat 610 the lookup. Raises ValueError if the run config references a skill 611 that is not in the injected dict. 612 """ 613 if self._resolved_skills is not None: 614 return self._resolved_skills 615 616 if self.run_config.type != "kiln_agent": 617 self._resolved_skills = [] 618 return self._resolved_skills 619 620 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 621 if tool_config is None or tool_config.tools is None: 622 self._resolved_skills = [] 623 return self._resolved_skills 624 625 skill_tool_ids = [ 626 tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX) 627 ] 628 if not skill_tool_ids: 629 self._resolved_skills = [] 630 return self._resolved_skills 631 632 injected = self.base_adapter_config.skills 633 if injected is None: 634 raise ValueError( 635 "Run config references skills but no skills dict was provided via " 636 "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load " 637 "skills and pass them to the adapter." 638 ) 639 640 skills: list[Skill] = [] 641 seen: set[str] = set() 642 for tool_id in skill_tool_ids: 643 sid = skill_id_from_tool_id(tool_id) 644 if sid not in injected: 645 raise ValueError( 646 f"Skill {sid} referenced in run config but not found in the " 647 "injected skills dict." 648 ) 649 if sid in seen: 650 continue 651 seen.add(sid) 652 skills.append(injected[sid]) 653 654 self._resolved_skills = skills 655 return self._resolved_skills 656 657 def build_chat_formatter( 658 self, 659 input: InputType, 660 prior_trace: list[ChatCompletionMessageParam] | None = None, 661 ) -> ChatFormatter: 662 prior_trace = self._normalize_prior_trace(prior_trace) 663 self._reject_multiturn_with_structured_input(prior_trace) 664 if prior_trace is not None: 665 return MultiturnFormatter(prior_trace, input) 666 if self.prompt_builder is None: 667 raise ValueError("Prompt builder is not available for MCP run config") 668 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 669 670 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 671 system_message = self.build_prompt() 672 673 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 674 if not cot_prompt: 675 return get_chat_formatter( 676 strategy=ChatStrategy.single_turn, 677 system_message=system_message, 678 user_input=input, 679 ) 680 681 # Some models like finetunes are trained with a specific chat strategy. Use that. 682 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 683 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 684 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 685 return get_chat_formatter( 686 strategy=tuned_chat_strategy, 687 system_message=system_message, 688 user_input=input, 689 thinking_instructions=cot_prompt, 690 ) 691 692 # Pick the best chat strategy for the model given it has a cot prompt. 693 reasoning_capable = self.model_provider().reasoning_capable 694 if reasoning_capable: 695 # "Thinking" LLM designed to output thinking in a structured format. We'll use its native format. 696 # A simple message with the COT prompt appended to the message list is sufficient 697 return get_chat_formatter( 698 strategy=ChatStrategy.single_turn_r1_thinking, 699 system_message=system_message, 700 user_input=input, 701 thinking_instructions=cot_prompt, 702 ) 703 else: 704 # Unstructured output with COT 705 # Two calls to separate the thinking from the final response 706 return get_chat_formatter( 707 strategy=ChatStrategy.two_message_cot, 708 system_message=system_message, 709 user_input=input, 710 thinking_instructions=cot_prompt, 711 ) 712 713 # create a run and task output 714 def generate_run( 715 self, 716 input: InputType, 717 input_source: DataSource | None, 718 run_output: RunOutput, 719 usage: Usage | None = None, 720 trace: list[ChatCompletionMessageParam] | None = None, 721 parent_task_run: TaskRun | None = None, 722 ) -> TaskRun: 723 output_str = ( 724 json.dumps(run_output.output, ensure_ascii=False) 725 if isinstance(run_output.output, dict) 726 else run_output.output 727 ) 728 729 output_source_type = ( 730 DataSourceType.tool_call 731 if self.run_config.type == "mcp" 732 else DataSourceType.synthetic 733 ) 734 735 new_output = TaskOutput( 736 output=output_str, 737 source=DataSource( 738 type=output_source_type, 739 properties=self._properties_for_task_output(), 740 run_config_id=self.base_adapter_config.task_run_config_id, 741 run_config=self.run_config, 742 ), 743 ) 744 745 # Convert input and output to JSON strings if they aren't strings 746 input_str = ( 747 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 748 ) 749 750 if input_source is None: 751 input_source = DataSource( 752 type=DataSourceType.human, 753 properties={"created_by": Config.shared().user_id}, 754 ) 755 756 parent_task_run_id: str | None = None 757 if parent_task_run is not None: 758 if parent_task_run.id is None: 759 raise ValueError( 760 "parent_task_run must be persisted before using as parent: save the parent " 761 "TaskRun (e.g. save_to_file()) so it has a stable id." 762 ) 763 parent_task_run_id = parent_task_run.id 764 765 return TaskRun( 766 parent=self.task, 767 parent_task_run_id=parent_task_run_id, 768 input=input_str, 769 input_source=input_source, 770 output=new_output, 771 intermediate_outputs=run_output.intermediate_outputs, 772 tags=self.base_adapter_config.default_tags or [], 773 usage=usage, 774 trace=trace, 775 cumulative_usage=MessageUsage.from_trace(trace), 776 ) 777 778 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 779 match self.run_config.type: 780 case "mcp": 781 return {} 782 case "kiln_agent": 783 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 784 raise ValueError("Kiln agent run config is required") 785 run_config = self.run_config 786 787 props: Dict[str, str | int | float] = {} 788 props["adapter_name"] = self.adapter_name() 789 # Legacy properties where we save the run_config details into custom properties. 790 # These are now also be saved in the run_config field. 791 props["model_name"] = run_config.model_name 792 props["model_provider"] = run_config.model_provider_name 793 props["prompt_id"] = run_config.prompt_id 794 props["structured_output_mode"] = run_config.structured_output_mode 795 props["temperature"] = run_config.temperature 796 props["top_p"] = run_config.top_p 797 798 return props 799 case _: 800 raise_exhaustive_enum_error(self.run_config.type) 801 802 def update_run_config_unknown_structured_output_mode(self) -> None: 803 if self.run_config.type != "kiln_agent": 804 return 805 run_config = as_kiln_agent_run_config(self.run_config) 806 structured_output_mode = run_config.structured_output_mode 807 808 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 809 # Look up our recommended mode from ml_model_list if we have one 810 if structured_output_mode == StructuredOutputMode.unknown: 811 new_run_config = run_config.model_copy(deep=True) 812 structured_output_mode = default_structured_output_mode_for_model_provider( 813 run_config.model_name, 814 run_config.model_provider_name, 815 ) 816 new_run_config.structured_output_mode = structured_output_mode 817 self.run_config = new_run_config 818 819 async def available_tools(self) -> list[KilnToolInterface]: 820 if self.run_config.type != "kiln_agent": 821 return [] 822 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 823 if tool_config is None or tool_config.tools is None: 824 return [] 825 826 non_skill_tool_ids = [ 827 tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX) 828 ] 829 830 tools: list[KilnToolInterface] = [ 831 tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids 832 ] 833 834 skills = self._resolve_skills() 835 if skills: 836 seen_names: set[str] = set() 837 for skill in skills: 838 if skill.name in seen_names: 839 raise ValueError( 840 f"Duplicate skill name '{skill.name}'. Each skill must have a unique name." 841 ) 842 seen_names.add(skill.name) 843 tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills)) 844 845 tool_names = [await tool.name() for tool in tools] 846 if len(tool_names) != len(set(tool_names)): 847 raise ValueError( 848 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 849 ) 850 851 return tools
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.
177 def model_provider(self) -> KilnModelProvider: 178 """ 179 Lazy load the model provider for this adapter. 180 """ 181 if self._model_provider is not None: 182 return self._model_provider 183 run_config = as_kiln_agent_run_config(self.run_config) 184 if not run_config.model_name or not run_config.model_provider_name: 185 raise ValueError("model_name and model_provider_name must be provided") 186 self._model_provider = kiln_model_provider_from( 187 run_config.model_name, run_config.model_provider_name 188 ) 189 if not self._model_provider: 190 raise ValueError( 191 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 192 ) 193 return self._model_provider
Lazy load the model provider for this adapter.
213 async def invoke( 214 self, 215 input: InputType, 216 input_source: DataSource | None = None, 217 prior_trace: list[ChatCompletionMessageParam] | None = None, 218 parent_task_run: TaskRun | None = None, 219 ) -> TaskRun: 220 task_run, _ = await self.invoke_returning_run_output( 221 input, input_source, prior_trace, parent_task_run 222 ) 223 return task_run
362 async def invoke_returning_run_output( 363 self, 364 input: InputType, 365 input_source: DataSource | None = None, 366 prior_trace: list[ChatCompletionMessageParam] | None = None, 367 parent_task_run: TaskRun | None = None, 368 ) -> Tuple[TaskRun, RunOutput]: 369 # Determine if this is the root agent (no existing run context) 370 is_root_agent = get_agent_run_id() is None 371 372 if is_root_agent: 373 run_id = generate_agent_run_id() 374 set_agent_run_id(run_id) 375 376 try: 377 return await self._run_returning_run_output( 378 input, input_source, prior_trace, parent_task_run 379 ) 380 finally: 381 if is_root_agent: 382 try: 383 run_id = get_agent_run_id() 384 if run_id: 385 await MCPSessionManager.shared().cleanup_session(run_id) 386 finally: 387 clear_agent_run_id()
389 def invoke_openai_stream( 390 self, 391 input: InputType, 392 input_source: DataSource | None = None, 393 prior_trace: list[ChatCompletionMessageParam] | None = None, 394 parent_task_run: TaskRun | None = None, 395 ) -> OpenAIStreamResult: 396 """Stream raw OpenAI-protocol chunks for the task execution. 397 398 Returns an async-iterable that yields ``ModelResponseStream`` chunks 399 as they arrive from the model. After the iterator is exhausted the 400 run has been validated and saved (when configured). The resulting 401 ``TaskRun`` is available via the ``.task_run`` property. 402 403 Tool-call rounds happen internally and are not surfaced; use 404 ``invoke_ai_sdk_stream`` if you need tool-call events. 405 """ 406 return OpenAIStreamResult( 407 self, input, input_source, prior_trace, parent_task_run 408 )
Stream raw OpenAI-protocol chunks for the task execution.
Returns an async-iterable that yields ModelResponseStream chunks
as they arrive from the model. After the iterator is exhausted the
run has been validated and saved (when configured). The resulting
TaskRun is available via the .task_run property.
Tool-call rounds happen internally and are not surfaced; use
invoke_ai_sdk_stream if you need tool-call events.
410 def invoke_ai_sdk_stream( 411 self, 412 input: InputType, 413 input_source: DataSource | None = None, 414 prior_trace: list[ChatCompletionMessageParam] | None = None, 415 parent_task_run: TaskRun | None = None, 416 ) -> AiSdkStreamResult: 417 """Stream AI SDK protocol events for the task execution. 418 419 Returns an async-iterable that yields ``AiSdkStreamEvent`` instances 420 covering text, reasoning, tool-call lifecycle, step boundaries, and 421 control events. After the iterator is exhausted the resulting 422 ``TaskRun`` is available via the ``.task_run`` property. 423 """ 424 return AiSdkStreamResult( 425 self, input, input_source, prior_trace, parent_task_run 426 )
Stream AI SDK protocol events for the task execution.
Returns an async-iterable that yields AiSdkStreamEvent instances
covering text, reasoning, tool-call lifecycle, step boundaries, and
control events. After the iterator is exhausted the resulting
TaskRun is available via the .task_run property.
587 def build_prompt(self) -> str: 588 if self.prompt_builder is None: 589 raise ValueError("Prompt builder is not available for MCP run config") 590 # The prompt builder needs to know if we want to inject formatting instructions 591 structured_output_mode = as_kiln_agent_run_config( 592 self.run_config 593 ).structured_output_mode 594 add_json_instructions = self.has_structured_output() and ( 595 structured_output_mode == StructuredOutputMode.json_instructions 596 or structured_output_mode 597 == StructuredOutputMode.json_instruction_and_object 598 ) 599 600 return self.prompt_builder.build_prompt( 601 include_json_instructions=add_json_instructions, 602 skills=self._resolve_skills(), 603 )
657 def build_chat_formatter( 658 self, 659 input: InputType, 660 prior_trace: list[ChatCompletionMessageParam] | None = None, 661 ) -> ChatFormatter: 662 prior_trace = self._normalize_prior_trace(prior_trace) 663 self._reject_multiturn_with_structured_input(prior_trace) 664 if prior_trace is not None: 665 return MultiturnFormatter(prior_trace, input) 666 if self.prompt_builder is None: 667 raise ValueError("Prompt builder is not available for MCP run config") 668 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 669 670 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 671 system_message = self.build_prompt() 672 673 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 674 if not cot_prompt: 675 return get_chat_formatter( 676 strategy=ChatStrategy.single_turn, 677 system_message=system_message, 678 user_input=input, 679 ) 680 681 # Some models like finetunes are trained with a specific chat strategy. Use that. 682 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 683 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 684 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 685 return get_chat_formatter( 686 strategy=tuned_chat_strategy, 687 system_message=system_message, 688 user_input=input, 689 thinking_instructions=cot_prompt, 690 ) 691 692 # Pick the best chat strategy for the model given it has a cot prompt. 693 reasoning_capable = self.model_provider().reasoning_capable 694 if reasoning_capable: 695 # "Thinking" LLM designed to output thinking in a structured format. We'll use its native format. 696 # A simple message with the COT prompt appended to the message list is sufficient 697 return get_chat_formatter( 698 strategy=ChatStrategy.single_turn_r1_thinking, 699 system_message=system_message, 700 user_input=input, 701 thinking_instructions=cot_prompt, 702 ) 703 else: 704 # Unstructured output with COT 705 # Two calls to separate the thinking from the final response 706 return get_chat_formatter( 707 strategy=ChatStrategy.two_message_cot, 708 system_message=system_message, 709 user_input=input, 710 thinking_instructions=cot_prompt, 711 )
714 def generate_run( 715 self, 716 input: InputType, 717 input_source: DataSource | None, 718 run_output: RunOutput, 719 usage: Usage | None = None, 720 trace: list[ChatCompletionMessageParam] | None = None, 721 parent_task_run: TaskRun | None = None, 722 ) -> TaskRun: 723 output_str = ( 724 json.dumps(run_output.output, ensure_ascii=False) 725 if isinstance(run_output.output, dict) 726 else run_output.output 727 ) 728 729 output_source_type = ( 730 DataSourceType.tool_call 731 if self.run_config.type == "mcp" 732 else DataSourceType.synthetic 733 ) 734 735 new_output = TaskOutput( 736 output=output_str, 737 source=DataSource( 738 type=output_source_type, 739 properties=self._properties_for_task_output(), 740 run_config_id=self.base_adapter_config.task_run_config_id, 741 run_config=self.run_config, 742 ), 743 ) 744 745 # Convert input and output to JSON strings if they aren't strings 746 input_str = ( 747 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 748 ) 749 750 if input_source is None: 751 input_source = DataSource( 752 type=DataSourceType.human, 753 properties={"created_by": Config.shared().user_id}, 754 ) 755 756 parent_task_run_id: str | None = None 757 if parent_task_run is not None: 758 if parent_task_run.id is None: 759 raise ValueError( 760 "parent_task_run must be persisted before using as parent: save the parent " 761 "TaskRun (e.g. save_to_file()) so it has a stable id." 762 ) 763 parent_task_run_id = parent_task_run.id 764 765 return TaskRun( 766 parent=self.task, 767 parent_task_run_id=parent_task_run_id, 768 input=input_str, 769 input_source=input_source, 770 output=new_output, 771 intermediate_outputs=run_output.intermediate_outputs, 772 tags=self.base_adapter_config.default_tags or [], 773 usage=usage, 774 trace=trace, 775 cumulative_usage=MessageUsage.from_trace(trace), 776 )
802 def update_run_config_unknown_structured_output_mode(self) -> None: 803 if self.run_config.type != "kiln_agent": 804 return 805 run_config = as_kiln_agent_run_config(self.run_config) 806 structured_output_mode = run_config.structured_output_mode 807 808 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 809 # Look up our recommended mode from ml_model_list if we have one 810 if structured_output_mode == StructuredOutputMode.unknown: 811 new_run_config = run_config.model_copy(deep=True) 812 structured_output_mode = default_structured_output_mode_for_model_provider( 813 run_config.model_name, 814 run_config.model_provider_name, 815 ) 816 new_run_config.structured_output_mode = structured_output_mode 817 self.run_config = new_run_config
819 async def available_tools(self) -> list[KilnToolInterface]: 820 if self.run_config.type != "kiln_agent": 821 return [] 822 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 823 if tool_config is None or tool_config.tools is None: 824 return [] 825 826 non_skill_tool_ids = [ 827 tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX) 828 ] 829 830 tools: list[KilnToolInterface] = [ 831 tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids 832 ] 833 834 skills = self._resolve_skills() 835 if skills: 836 seen_names: set[str] = set() 837 for skill in skills: 838 if skill.name in seen_names: 839 raise ValueError( 840 f"Duplicate skill name '{skill.name}'. Each skill must have a unique name." 841 ) 842 seen_names.add(skill.name) 843 tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills)) 844 845 tool_names = [await tool.name() for tool in tools] 846 if len(tool_names) != len(set(tool_names)): 847 raise ValueError( 848 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 849 ) 850 851 return tools
854class OpenAIStreamResult: 855 """Async-iterable wrapper around the OpenAI streaming flow. 856 857 Yields ``ModelResponseStream`` chunks. After iteration the resulting 858 ``TaskRun`` is available via the ``.task_run`` property. 859 860 When return_on_tool_call=True and the model requests tool calls, the stream 861 will stop and ``task_run.is_toolcall_pending`` will be True. 862 """ 863 864 def __init__( 865 self, 866 adapter: BaseAdapter, 867 input: InputType, 868 input_source: DataSource | None, 869 prior_trace: list[ChatCompletionMessageParam] | None, 870 parent_task_run: TaskRun | None = None, 871 ) -> None: 872 self._adapter = adapter 873 self._input = input 874 self._input_source = input_source 875 self._prior_trace = prior_trace 876 self._parent_task_run = parent_task_run 877 self._task_run: TaskRun | None = None 878 879 @property 880 def task_run(self) -> TaskRun: 881 if self._task_run is None: 882 raise RuntimeError( 883 "Stream has not been fully consumed yet. " 884 "Iterate over the stream before accessing .task_run" 885 ) 886 return self._task_run 887 888 async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: 889 self._task_run = None 890 is_root_agent = get_agent_run_id() is None 891 if is_root_agent: 892 set_agent_run_id(generate_agent_run_id()) 893 894 try: 895 adapter_stream = self._adapter._prepare_stream( 896 self._input, self._prior_trace 897 ) 898 899 async for event in adapter_stream: 900 if isinstance(event, ModelResponseStream): 901 yield event 902 903 self._task_run = self._adapter._finalize_stream( 904 adapter_stream, self._input, self._input_source, self._parent_task_run 905 ) 906 finally: 907 if is_root_agent: 908 try: 909 run_id = get_agent_run_id() 910 if run_id: 911 await MCPSessionManager.shared().cleanup_session(run_id) 912 finally: 913 clear_agent_run_id()
Async-iterable wrapper around the OpenAI streaming flow.
Yields ModelResponseStream chunks. After iteration the resulting
TaskRun is available via the .task_run property.
When return_on_tool_call=True and the model requests tool calls, the stream
will stop and task_run.is_toolcall_pending will be True.
864 def __init__( 865 self, 866 adapter: BaseAdapter, 867 input: InputType, 868 input_source: DataSource | None, 869 prior_trace: list[ChatCompletionMessageParam] | None, 870 parent_task_run: TaskRun | None = None, 871 ) -> None: 872 self._adapter = adapter 873 self._input = input 874 self._input_source = input_source 875 self._prior_trace = prior_trace 876 self._parent_task_run = parent_task_run 877 self._task_run: TaskRun | None = None
916class AiSdkStreamResult: 917 """Async-iterable wrapper around the AI SDK streaming flow. 918 919 Yields ``AiSdkStreamEvent`` instances. After iteration the resulting 920 ``TaskRun`` is available via the ``.task_run`` property. 921 922 When return_on_tool_call=True and the model requests tool calls, the FINISH 923 event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending`` 924 will be True. 925 """ 926 927 def __init__( 928 self, 929 adapter: BaseAdapter, 930 input: InputType, 931 input_source: DataSource | None, 932 prior_trace: list[ChatCompletionMessageParam] | None, 933 parent_task_run: TaskRun | None = None, 934 ) -> None: 935 self._adapter = adapter 936 self._input = input 937 self._input_source = input_source 938 self._prior_trace = prior_trace 939 self._parent_task_run = parent_task_run 940 self._task_run: TaskRun | None = None 941 942 @property 943 def task_run(self) -> TaskRun: 944 if self._task_run is None: 945 raise RuntimeError( 946 "Stream has not been fully consumed yet. " 947 "Iterate over the stream before accessing .task_run" 948 ) 949 return self._task_run 950 951 async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]: 952 self._task_run = None 953 is_root_agent = get_agent_run_id() is None 954 if is_root_agent: 955 set_agent_run_id(generate_agent_run_id()) 956 957 try: 958 adapter_stream = self._adapter._prepare_stream( 959 self._input, self._prior_trace 960 ) 961 962 message_id = f"msg-{uuid.uuid4().hex}" 963 converter = AiSdkStreamConverter() 964 965 yield StartEvent(messageId=message_id) 966 yield StartStepEvent() 967 968 last_event_was_tool_call = False 969 async for event in adapter_stream: 970 if isinstance(event, ModelResponseStream): 971 if last_event_was_tool_call: 972 converter.reset_for_next_step() 973 last_event_was_tool_call = False 974 for ai_event in converter.convert_chunk(event): 975 yield ai_event 976 elif isinstance(event, ToolCallEvent): 977 last_event_was_tool_call = True 978 for ai_event in converter.convert_tool_event(event): 979 yield ai_event 980 981 for ai_event in converter.close_open_blocks(): 982 yield ai_event 983 984 yield FinishStepEvent() 985 986 self._task_run = self._adapter._finalize_stream( 987 adapter_stream, self._input, self._input_source, self._parent_task_run 988 ) 989 990 if self._task_run.is_toolcall_pending: 991 yield FinishEvent( 992 messageMetadata=FinishMessageMetadata(finishReason="tool-calls"), 993 ) 994 else: 995 for ai_event in converter.finalize(): 996 yield ai_event 997 finally: 998 if is_root_agent: 999 try: 1000 run_id = get_agent_run_id() 1001 if run_id: 1002 await MCPSessionManager.shared().cleanup_session(run_id) 1003 finally: 1004 clear_agent_run_id()
Async-iterable wrapper around the AI SDK streaming flow.
Yields AiSdkStreamEvent instances. After iteration the resulting
TaskRun is available via the .task_run property.
When return_on_tool_call=True and the model requests tool calls, the FINISH
event will have finishReason: "tool-calls" and task_run.is_toolcall_pending
will be True.
927 def __init__( 928 self, 929 adapter: BaseAdapter, 930 input: InputType, 931 input_source: DataSource | None, 932 prior_trace: list[ChatCompletionMessageParam] | None, 933 parent_task_run: TaskRun | None = None, 934 ) -> None: 935 self._adapter = adapter 936 self._input = input 937 self._input_source = input_source 938 self._prior_trace = prior_trace 939 self._parent_task_run = parent_task_run 940 self._task_run: TaskRun | None = None