kiln_ai.adapters.model_adapters.base_adapter
1from __future__ import annotations 2 3import json 4import uuid 5from abc import ABCMeta, abstractmethod 6from dataclasses import dataclass 7from typing import TYPE_CHECKING, AsyncIterator, Dict, Tuple 8 9from litellm.types.utils import ModelResponseStream 10 11from kiln_ai.adapters.chat.chat_formatter import ( 12 ChatFormatter, 13 MultiturnFormatter, 14 get_chat_formatter, 15) 16from kiln_ai.adapters.ml_model_list import ( 17 KilnModelProvider, 18 StructuredOutputMode, 19 default_structured_output_mode_for_model_provider, 20) 21from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStreamResult 22from kiln_ai.adapters.model_adapters.stream_events import ( 23 AiSdkStreamConverter, 24 AiSdkStreamEvent, 25 FinishEvent, 26 FinishMessageMetadata, 27 FinishStepEvent, 28 StartEvent, 29 StartStepEvent, 30 ToolCallEvent, 31) 32from kiln_ai.adapters.parsers.json_parser import parse_json_string 33from kiln_ai.adapters.parsers.parser_registry import model_parser_from_id 34from kiln_ai.adapters.parsers.request_formatters import request_formatter_from_id 35from kiln_ai.adapters.prompt_builders import BasePromptBuilder, prompt_builder_from_id 36from kiln_ai.adapters.provider_tools import kiln_model_provider_from 37from kiln_ai.adapters.run_output import RunOutput 38from kiln_ai.datamodel import ( 39 DataSource, 40 DataSourceType, 41 MessageUsage, 42 Task, 43 TaskOutput, 44 TaskRun, 45 Usage, 46) 47from kiln_ai.datamodel.datamodel_enums import ChatStrategy, InputType 48from kiln_ai.datamodel.json_schema import validate_schema_with_value_error 49from kiln_ai.datamodel.run_config import ( 50 KilnAgentRunConfigProperties, 51 as_kiln_agent_run_config, 52) 53from kiln_ai.datamodel.skill import Skill 54from kiln_ai.datamodel.task import RunConfigProperties 55from kiln_ai.datamodel.tool_id import SKILL_TOOL_ID_PREFIX, skill_id_from_tool_id 56 57# Import agent run context for run lifecycle management 58from kiln_ai.run_context import ( 59 clear_agent_run_id, 60 generate_agent_run_id, 61 get_agent_run_id, 62 set_agent_run_id, 63) 64from kiln_ai.tools import KilnToolInterface 65from kiln_ai.tools.mcp_session_manager import MCPSessionManager 66from kiln_ai.tools.skill_tool import SkillTool 67from kiln_ai.tools.tool_registry import tool_from_id 68from kiln_ai.utils.config import Config 69from kiln_ai.utils.exhaustive_error import raise_exhaustive_enum_error 70from kiln_ai.utils.open_ai_types import ChatCompletionMessageParam 71 72if TYPE_CHECKING: 73 from kiln_ai.adapters.model_adapters.adapter_stream import AdapterStream 74 75SkillsDict = Dict[str, Skill] 76 77 78@dataclass 79class AdapterConfig: 80 """ 81 An adapter config is config options that do NOT impact the output of the model. 82 83 For example: if it's saved, of if we request additional data like logprobs. 84 """ 85 86 allow_saving: bool = True 87 top_logprobs: int | None = None 88 default_tags: list[str] | None = None 89 90 """ 91 A custom prompt builder can be injected to override the system prompt building process. 92 If not provided, the prompt builder will be created from the run_config.prompt_id which 93 may load additional files from disk. 94 """ 95 prompt_builder: BasePromptBuilder | None = None 96 97 """ 98 Pre-loaded skills keyed by skill ID. When the run config references skills, 99 they are looked up from this dict instead of reading from the filesystem. 100 Use load_skills_for_task() to build this dict. 101 """ 102 skills: SkillsDict | None = None 103 104 """ 105 When True, the adapter will stop and return control to the caller when a tool call 106 is invoked, instead of processing tool calls internally. Default is False (process 107 tool calls internally). 108 """ 109 return_on_tool_call: bool = False 110 111 """ 112 Extra tools provided directly by the caller, in addition to tools resolved from the 113 task's tool registry. These are sent to the model together with registry tools, and 114 their names must not collide with registry tool names. 115 116 If ``return_on_tool_call`` is False (the default), the adapter executes these tools 117 itself just like registry tools. If True, the adapter returns as soon as the model 118 requests a tool call and the caller is responsible for running the tool and passing 119 results back via ``prior_trace``. 120 """ 121 unmanaged_tools: list[KilnToolInterface] | None = None 122 123 """ 124 When True, automatically inject prompt caching hints into completion 125 requests. This is a cost optimization and does not affect model output. 126 """ 127 automatic_prompt_caching: bool = False 128 129 130class BaseAdapter(metaclass=ABCMeta): 131 """Base class for AI model adapters that handle task execution. 132 133 This abstract class provides the foundation for implementing model-specific adapters 134 that can process tasks with structured or unstructured inputs/outputs. It handles 135 input/output validation, prompt building, and run tracking. 136 137 Prompt building is handled internally by the adapter, which uses a prompt builder 138 based on the run config. To override the prompt building behavior, pass a custom prompt 139 builder to the adapter config. 140 """ 141 142 def __init__( 143 self, 144 task: Task, 145 run_config: RunConfigProperties, 146 config: AdapterConfig | None = None, 147 ): 148 self.task = task 149 self.run_config: RunConfigProperties = run_config 150 self.base_adapter_config = config or AdapterConfig() 151 152 if isinstance(run_config, KilnAgentRunConfigProperties): 153 self.update_run_config_unknown_structured_output_mode() 154 self.prompt_builder = ( 155 self.base_adapter_config.prompt_builder 156 or prompt_builder_from_id(run_config.prompt_id, task) 157 ) 158 else: 159 self.prompt_builder = None 160 self._model_provider: KilnModelProvider | None = None 161 self._resolved_skills: list[Skill] | None = None 162 163 self.output_schema = task.output_json_schema 164 self.input_schema = task.input_json_schema 165 166 def model_provider(self) -> KilnModelProvider: 167 """ 168 Lazy load the model provider for this adapter. 169 """ 170 if self._model_provider is not None: 171 return self._model_provider 172 run_config = as_kiln_agent_run_config(self.run_config) 173 if not run_config.model_name or not run_config.model_provider_name: 174 raise ValueError("model_name and model_provider_name must be provided") 175 self._model_provider = kiln_model_provider_from( 176 run_config.model_name, run_config.model_provider_name 177 ) 178 if not self._model_provider: 179 raise ValueError( 180 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 181 ) 182 return self._model_provider 183 184 @staticmethod 185 def _normalize_prior_trace( 186 prior_trace: list[ChatCompletionMessageParam] | None, 187 ) -> list[ChatCompletionMessageParam] | None: 188 if not prior_trace: 189 return None 190 return prior_trace 191 192 def _reject_multiturn_with_structured_input( 193 self, 194 prior_trace: list[ChatCompletionMessageParam] | None, 195 ) -> None: 196 if prior_trace is not None and self.input_schema is not None: 197 raise ValueError( 198 "Cannot run multiturn execution with a task that has a structured input schema. " 199 "Use an unstructured task, or call without prior_trace." 200 ) 201 202 async def invoke( 203 self, 204 input: InputType, 205 input_source: DataSource | None = None, 206 prior_trace: list[ChatCompletionMessageParam] | None = None, 207 parent_task_run: TaskRun | None = None, 208 ) -> TaskRun: 209 task_run, _ = await self.invoke_returning_run_output( 210 input, input_source, prior_trace, parent_task_run 211 ) 212 return task_run 213 214 async def _run_returning_run_output( 215 self, 216 input: InputType, 217 input_source: DataSource | None = None, 218 prior_trace: list[ChatCompletionMessageParam] | None = None, 219 parent_task_run: TaskRun | None = None, 220 ) -> Tuple[TaskRun, RunOutput]: 221 prior_trace = self._normalize_prior_trace(prior_trace) 222 self._reject_multiturn_with_structured_input(prior_trace) 223 224 # validate input, allowing arrays 225 if self.input_schema is not None: 226 validate_schema_with_value_error( 227 input, 228 self.input_schema, 229 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 230 require_object=False, 231 ) 232 233 # Format model input for model call (we save the original input in the task without formatting) 234 formatted_input = input 235 formatter_id = self.model_provider().formatter 236 if formatter_id is not None: 237 formatter = request_formatter_from_id(formatter_id) 238 formatted_input = formatter.format_input(input) 239 240 # Run 241 run_output, usage = await self._run(formatted_input, prior_trace=prior_trace) 242 243 if not run_output.is_toolcall_pending: 244 # Normal completion: parse and validate output 245 provider = self.model_provider() 246 parser = model_parser_from_id(provider.parser) 247 parsed_output = parser.parse_output(original_output=run_output) 248 249 # validate output 250 if self.output_schema is not None: 251 # Parse json to dict if we have structured output 252 if isinstance(parsed_output.output, str): 253 parsed_output.output = parse_json_string(parsed_output.output) 254 255 if not isinstance(parsed_output.output, dict): 256 raise RuntimeError( 257 f"structured response is not a dict: {parsed_output.output}" 258 ) 259 validate_schema_with_value_error( 260 parsed_output.output, 261 self.output_schema, 262 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 263 ) 264 else: 265 if not isinstance(parsed_output.output, str): 266 raise RuntimeError( 267 f"response is not a string for non-structured task: {parsed_output.output}" 268 ) 269 270 trace_has_toolcalls = parsed_output.trace is not None and any( 271 message.get("role", None) == "tool" for message in parsed_output.trace 272 ) 273 274 # Validate reasoning content is present and required 275 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 276 if ( 277 provider.reasoning_capable 278 and ( 279 not parsed_output.intermediate_outputs 280 or "reasoning" not in parsed_output.intermediate_outputs 281 ) 282 and not ( 283 provider.reasoning_optional_for_structured_output 284 and self.has_structured_output() 285 ) 286 and not trace_has_toolcalls 287 ): 288 raise RuntimeError( 289 "Reasoning is required for this model, but no reasoning was returned." 290 ) 291 292 run_output = parsed_output 293 294 run = self.generate_run( 295 input, input_source, run_output, usage, run_output.trace, parent_task_run 296 ) 297 298 # Save the run if configured to do so, and we have a path to save to 299 if ( 300 self.base_adapter_config.allow_saving 301 and Config.shared().autosave_runs 302 and self.task.path is not None 303 ): 304 run.save_to_file() 305 else: 306 # Clear the ID to indicate it's not persisted 307 run.id = None 308 309 return run, run_output 310 311 async def invoke_returning_run_output( 312 self, 313 input: InputType, 314 input_source: DataSource | None = None, 315 prior_trace: list[ChatCompletionMessageParam] | None = None, 316 parent_task_run: TaskRun | None = None, 317 ) -> Tuple[TaskRun, RunOutput]: 318 # Determine if this is the root agent (no existing run context) 319 is_root_agent = get_agent_run_id() is None 320 321 if is_root_agent: 322 run_id = generate_agent_run_id() 323 set_agent_run_id(run_id) 324 325 try: 326 return await self._run_returning_run_output( 327 input, input_source, prior_trace, parent_task_run 328 ) 329 finally: 330 if is_root_agent: 331 try: 332 run_id = get_agent_run_id() 333 if run_id: 334 await MCPSessionManager.shared().cleanup_session(run_id) 335 finally: 336 clear_agent_run_id() 337 338 def invoke_openai_stream( 339 self, 340 input: InputType, 341 input_source: DataSource | None = None, 342 prior_trace: list[ChatCompletionMessageParam] | None = None, 343 parent_task_run: TaskRun | None = None, 344 ) -> OpenAIStreamResult: 345 """Stream raw OpenAI-protocol chunks for the task execution. 346 347 Returns an async-iterable that yields ``ModelResponseStream`` chunks 348 as they arrive from the model. After the iterator is exhausted the 349 run has been validated and saved (when configured). The resulting 350 ``TaskRun`` is available via the ``.task_run`` property. 351 352 Tool-call rounds happen internally and are not surfaced; use 353 ``invoke_ai_sdk_stream`` if you need tool-call events. 354 """ 355 return OpenAIStreamResult( 356 self, input, input_source, prior_trace, parent_task_run 357 ) 358 359 def invoke_ai_sdk_stream( 360 self, 361 input: InputType, 362 input_source: DataSource | None = None, 363 prior_trace: list[ChatCompletionMessageParam] | None = None, 364 parent_task_run: TaskRun | None = None, 365 ) -> AiSdkStreamResult: 366 """Stream AI SDK protocol events for the task execution. 367 368 Returns an async-iterable that yields ``AiSdkStreamEvent`` instances 369 covering text, reasoning, tool-call lifecycle, step boundaries, and 370 control events. After the iterator is exhausted the resulting 371 ``TaskRun`` is available via the ``.task_run`` property. 372 """ 373 return AiSdkStreamResult( 374 self, input, input_source, prior_trace, parent_task_run 375 ) 376 377 def _prepare_stream( 378 self, 379 input: InputType, 380 prior_trace: list[ChatCompletionMessageParam] | None, 381 ) -> AdapterStream: 382 prior_trace = self._normalize_prior_trace(prior_trace) 383 self._reject_multiturn_with_structured_input(prior_trace) 384 385 if self.input_schema is not None: 386 validate_schema_with_value_error( 387 input, 388 self.input_schema, 389 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 390 require_object=False, 391 ) 392 393 formatted_input = input 394 formatter_id = self.model_provider().formatter 395 if formatter_id is not None: 396 formatter = request_formatter_from_id(formatter_id) 397 formatted_input = formatter.format_input(input) 398 399 return self._create_run_stream(formatted_input, prior_trace) 400 401 def _finalize_stream( 402 self, 403 adapter_stream: AdapterStream, 404 input: InputType, 405 input_source: DataSource | None, 406 parent_task_run: TaskRun | None = None, 407 ) -> TaskRun: 408 """Streaming invocations are only concerned with passing through events as they come in. 409 At the end of the stream, we still need to validate the output, create a run and everything 410 else that a non-streaming invocation would do. 411 """ 412 413 result: AdapterStreamResult = adapter_stream.result 414 run_output = result.run_output 415 usage = result.usage 416 417 if not run_output.is_toolcall_pending: 418 # Normal completion: parse and validate output 419 provider = self.model_provider() 420 parser = model_parser_from_id(provider.parser) 421 parsed_output = parser.parse_output(original_output=run_output) 422 423 if self.output_schema is not None: 424 if isinstance(parsed_output.output, str): 425 parsed_output.output = parse_json_string(parsed_output.output) 426 if not isinstance(parsed_output.output, dict): 427 raise RuntimeError( 428 f"structured response is not a dict: {parsed_output.output}" 429 ) 430 validate_schema_with_value_error( 431 parsed_output.output, 432 self.output_schema, 433 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 434 ) 435 else: 436 if not isinstance(parsed_output.output, str): 437 raise RuntimeError( 438 f"response is not a string for non-structured task: {parsed_output.output}" 439 ) 440 441 trace_has_toolcalls = parsed_output.trace is not None and any( 442 message.get("role", None) == "tool" for message in parsed_output.trace 443 ) 444 if ( 445 provider.reasoning_capable 446 and ( 447 not parsed_output.intermediate_outputs 448 or "reasoning" not in parsed_output.intermediate_outputs 449 ) 450 and not ( 451 provider.reasoning_optional_for_structured_output 452 and self.has_structured_output() 453 ) 454 and not trace_has_toolcalls 455 ): 456 raise RuntimeError( 457 "Reasoning is required for this model, but no reasoning was returned." 458 ) 459 460 run_output = parsed_output 461 462 run = self.generate_run( 463 input, input_source, run_output, usage, run_output.trace, parent_task_run 464 ) 465 466 if ( 467 self.base_adapter_config.allow_saving 468 and Config.shared().autosave_runs 469 and self.task.path is not None 470 ): 471 run.save_to_file() 472 else: 473 run.id = None 474 475 return run 476 477 def has_structured_output(self) -> bool: 478 return self.output_schema is not None 479 480 @abstractmethod 481 def adapter_name(self) -> str: 482 pass 483 484 @abstractmethod 485 async def _run( 486 self, 487 input: InputType, 488 prior_trace: list[ChatCompletionMessageParam] | None = None, 489 ) -> Tuple[RunOutput, Usage | None]: 490 pass 491 492 def _create_run_stream( 493 self, 494 input: InputType, 495 prior_trace: list[ChatCompletionMessageParam] | None = None, 496 ) -> AdapterStream: 497 """Create a stream for the adapter. Implementations must override this method to support streaming.""" 498 raise NotImplementedError("Streaming is not supported for this adapter type") 499 500 def build_prompt(self) -> str: 501 if self.prompt_builder is None: 502 raise ValueError("Prompt builder is not available for MCP run config") 503 # The prompt builder needs to know if we want to inject formatting instructions 504 structured_output_mode = as_kiln_agent_run_config( 505 self.run_config 506 ).structured_output_mode 507 add_json_instructions = self.has_structured_output() and ( 508 structured_output_mode == StructuredOutputMode.json_instructions 509 or structured_output_mode 510 == StructuredOutputMode.json_instruction_and_object 511 ) 512 513 return self.prompt_builder.build_prompt( 514 include_json_instructions=add_json_instructions, 515 skills=self._resolve_skills(), 516 ) 517 518 def _resolve_skills(self) -> list[Skill]: 519 """Resolve skills from the injected skills dict. 520 521 Uses the pre-loaded skills dict from AdapterConfig. Caches the result 522 so that build_prompt and available_tools don't repeat 523 the lookup. Raises ValueError if the run config references a skill 524 that is not in the injected dict. 525 """ 526 if self._resolved_skills is not None: 527 return self._resolved_skills 528 529 if self.run_config.type != "kiln_agent": 530 self._resolved_skills = [] 531 return self._resolved_skills 532 533 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 534 if tool_config is None or tool_config.tools is None: 535 self._resolved_skills = [] 536 return self._resolved_skills 537 538 skill_tool_ids = [ 539 tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX) 540 ] 541 if not skill_tool_ids: 542 self._resolved_skills = [] 543 return self._resolved_skills 544 545 injected = self.base_adapter_config.skills 546 if injected is None: 547 raise ValueError( 548 "Run config references skills but no skills dict was provided via " 549 "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load " 550 "skills and pass them to the adapter." 551 ) 552 553 skills: list[Skill] = [] 554 seen: set[str] = set() 555 for tool_id in skill_tool_ids: 556 sid = skill_id_from_tool_id(tool_id) 557 if sid not in injected: 558 raise ValueError( 559 f"Skill {sid} referenced in run config but not found in the " 560 "injected skills dict." 561 ) 562 if sid in seen: 563 continue 564 seen.add(sid) 565 skills.append(injected[sid]) 566 567 self._resolved_skills = skills 568 return self._resolved_skills 569 570 def build_chat_formatter( 571 self, 572 input: InputType, 573 prior_trace: list[ChatCompletionMessageParam] | None = None, 574 ) -> ChatFormatter: 575 prior_trace = self._normalize_prior_trace(prior_trace) 576 self._reject_multiturn_with_structured_input(prior_trace) 577 if prior_trace is not None: 578 return MultiturnFormatter(prior_trace, input) 579 if self.prompt_builder is None: 580 raise ValueError("Prompt builder is not available for MCP run config") 581 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 582 583 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 584 system_message = self.build_prompt() 585 586 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 587 if not cot_prompt: 588 return get_chat_formatter( 589 strategy=ChatStrategy.single_turn, 590 system_message=system_message, 591 user_input=input, 592 ) 593 594 # Some models like finetunes are trained with a specific chat strategy. Use that. 595 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 596 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 597 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 598 return get_chat_formatter( 599 strategy=tuned_chat_strategy, 600 system_message=system_message, 601 user_input=input, 602 thinking_instructions=cot_prompt, 603 ) 604 605 # Pick the best chat strategy for the model given it has a cot prompt. 606 reasoning_capable = self.model_provider().reasoning_capable 607 if reasoning_capable: 608 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 609 # A simple message with the COT prompt appended to the message list is sufficient 610 return get_chat_formatter( 611 strategy=ChatStrategy.single_turn_r1_thinking, 612 system_message=system_message, 613 user_input=input, 614 thinking_instructions=cot_prompt, 615 ) 616 else: 617 # Unstructured output with COT 618 # Two calls to separate the thinking from the final response 619 return get_chat_formatter( 620 strategy=ChatStrategy.two_message_cot, 621 system_message=system_message, 622 user_input=input, 623 thinking_instructions=cot_prompt, 624 ) 625 626 # create a run and task output 627 def generate_run( 628 self, 629 input: InputType, 630 input_source: DataSource | None, 631 run_output: RunOutput, 632 usage: Usage | None = None, 633 trace: list[ChatCompletionMessageParam] | None = None, 634 parent_task_run: TaskRun | None = None, 635 ) -> TaskRun: 636 output_str = ( 637 json.dumps(run_output.output, ensure_ascii=False) 638 if isinstance(run_output.output, dict) 639 else run_output.output 640 ) 641 642 output_source_type = ( 643 DataSourceType.tool_call 644 if self.run_config.type == "mcp" 645 else DataSourceType.synthetic 646 ) 647 648 new_output = TaskOutput( 649 output=output_str, 650 source=DataSource( 651 type=output_source_type, 652 properties=self._properties_for_task_output(), 653 run_config=self.run_config, 654 ), 655 ) 656 657 # Convert input and output to JSON strings if they aren't strings 658 input_str = ( 659 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 660 ) 661 662 if input_source is None: 663 input_source = DataSource( 664 type=DataSourceType.human, 665 properties={"created_by": Config.shared().user_id}, 666 ) 667 668 parent_task_run_id: str | None = None 669 if parent_task_run is not None: 670 if parent_task_run.id is None: 671 raise ValueError( 672 "parent_task_run must be persisted before using as parent: save the parent " 673 "TaskRun (e.g. save_to_file()) so it has a stable id." 674 ) 675 parent_task_run_id = parent_task_run.id 676 677 return TaskRun( 678 parent=self.task, 679 parent_task_run_id=parent_task_run_id, 680 input=input_str, 681 input_source=input_source, 682 output=new_output, 683 intermediate_outputs=run_output.intermediate_outputs, 684 tags=self.base_adapter_config.default_tags or [], 685 usage=usage, 686 trace=trace, 687 cumulative_usage=MessageUsage.from_trace(trace), 688 ) 689 690 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 691 match self.run_config.type: 692 case "mcp": 693 return {} 694 case "kiln_agent": 695 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 696 raise ValueError("Kiln agent run config is required") 697 run_config = self.run_config 698 699 props: Dict[str, str | int | float] = {} 700 props["adapter_name"] = self.adapter_name() 701 # Legacy properties where we save the run_config details into custom properties. 702 # These are now also be saved in the run_config field. 703 props["model_name"] = run_config.model_name 704 props["model_provider"] = run_config.model_provider_name 705 props["prompt_id"] = run_config.prompt_id 706 props["structured_output_mode"] = run_config.structured_output_mode 707 props["temperature"] = run_config.temperature 708 props["top_p"] = run_config.top_p 709 710 return props 711 case _: 712 raise_exhaustive_enum_error(self.run_config.type) 713 714 def update_run_config_unknown_structured_output_mode(self) -> None: 715 if self.run_config.type != "kiln_agent": 716 return 717 run_config = as_kiln_agent_run_config(self.run_config) 718 structured_output_mode = run_config.structured_output_mode 719 720 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 721 # Look up our recommended mode from ml_model_list if we have one 722 if structured_output_mode == StructuredOutputMode.unknown: 723 new_run_config = run_config.model_copy(deep=True) 724 structured_output_mode = default_structured_output_mode_for_model_provider( 725 run_config.model_name, 726 run_config.model_provider_name, 727 ) 728 new_run_config.structured_output_mode = structured_output_mode 729 self.run_config = new_run_config 730 731 async def available_tools(self) -> list[KilnToolInterface]: 732 if self.run_config.type != "kiln_agent": 733 return [] 734 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 735 if tool_config is None or tool_config.tools is None: 736 return [] 737 738 non_skill_tool_ids = [ 739 tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX) 740 ] 741 742 tools: list[KilnToolInterface] = [ 743 tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids 744 ] 745 746 skills = self._resolve_skills() 747 if skills: 748 seen_names: set[str] = set() 749 for skill in skills: 750 if skill.name in seen_names: 751 raise ValueError( 752 f"Duplicate skill name '{skill.name}'. Each skill must have a unique name." 753 ) 754 seen_names.add(skill.name) 755 tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills)) 756 757 tool_names = [await tool.name() for tool in tools] 758 if len(tool_names) != len(set(tool_names)): 759 raise ValueError( 760 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 761 ) 762 763 return tools 764 765 766class OpenAIStreamResult: 767 """Async-iterable wrapper around the OpenAI streaming flow. 768 769 Yields ``ModelResponseStream`` chunks. After iteration the resulting 770 ``TaskRun`` is available via the ``.task_run`` property. 771 772 When return_on_tool_call=True and the model requests tool calls, the stream 773 will stop and ``task_run.is_toolcall_pending`` will be True. 774 """ 775 776 def __init__( 777 self, 778 adapter: BaseAdapter, 779 input: InputType, 780 input_source: DataSource | None, 781 prior_trace: list[ChatCompletionMessageParam] | None, 782 parent_task_run: TaskRun | None = None, 783 ) -> None: 784 self._adapter = adapter 785 self._input = input 786 self._input_source = input_source 787 self._prior_trace = prior_trace 788 self._parent_task_run = parent_task_run 789 self._task_run: TaskRun | None = None 790 791 @property 792 def task_run(self) -> TaskRun: 793 if self._task_run is None: 794 raise RuntimeError( 795 "Stream has not been fully consumed yet. " 796 "Iterate over the stream before accessing .task_run" 797 ) 798 return self._task_run 799 800 async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: 801 self._task_run = None 802 is_root_agent = get_agent_run_id() is None 803 if is_root_agent: 804 set_agent_run_id(generate_agent_run_id()) 805 806 try: 807 adapter_stream = self._adapter._prepare_stream( 808 self._input, self._prior_trace 809 ) 810 811 async for event in adapter_stream: 812 if isinstance(event, ModelResponseStream): 813 yield event 814 815 self._task_run = self._adapter._finalize_stream( 816 adapter_stream, self._input, self._input_source, self._parent_task_run 817 ) 818 finally: 819 if is_root_agent: 820 try: 821 run_id = get_agent_run_id() 822 if run_id: 823 await MCPSessionManager.shared().cleanup_session(run_id) 824 finally: 825 clear_agent_run_id() 826 827 828class AiSdkStreamResult: 829 """Async-iterable wrapper around the AI SDK streaming flow. 830 831 Yields ``AiSdkStreamEvent`` instances. After iteration the resulting 832 ``TaskRun`` is available via the ``.task_run`` property. 833 834 When return_on_tool_call=True and the model requests tool calls, the FINISH 835 event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending`` 836 will be True. 837 """ 838 839 def __init__( 840 self, 841 adapter: BaseAdapter, 842 input: InputType, 843 input_source: DataSource | None, 844 prior_trace: list[ChatCompletionMessageParam] | None, 845 parent_task_run: TaskRun | None = None, 846 ) -> None: 847 self._adapter = adapter 848 self._input = input 849 self._input_source = input_source 850 self._prior_trace = prior_trace 851 self._parent_task_run = parent_task_run 852 self._task_run: TaskRun | None = None 853 854 @property 855 def task_run(self) -> TaskRun: 856 if self._task_run is None: 857 raise RuntimeError( 858 "Stream has not been fully consumed yet. " 859 "Iterate over the stream before accessing .task_run" 860 ) 861 return self._task_run 862 863 async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]: 864 self._task_run = None 865 is_root_agent = get_agent_run_id() is None 866 if is_root_agent: 867 set_agent_run_id(generate_agent_run_id()) 868 869 try: 870 adapter_stream = self._adapter._prepare_stream( 871 self._input, self._prior_trace 872 ) 873 874 message_id = f"msg-{uuid.uuid4().hex}" 875 converter = AiSdkStreamConverter() 876 877 yield StartEvent(messageId=message_id) 878 yield StartStepEvent() 879 880 last_event_was_tool_call = False 881 async for event in adapter_stream: 882 if isinstance(event, ModelResponseStream): 883 if last_event_was_tool_call: 884 converter.reset_for_next_step() 885 last_event_was_tool_call = False 886 for ai_event in converter.convert_chunk(event): 887 yield ai_event 888 elif isinstance(event, ToolCallEvent): 889 last_event_was_tool_call = True 890 for ai_event in converter.convert_tool_event(event): 891 yield ai_event 892 893 for ai_event in converter.close_open_blocks(): 894 yield ai_event 895 896 yield FinishStepEvent() 897 898 self._task_run = self._adapter._finalize_stream( 899 adapter_stream, self._input, self._input_source, self._parent_task_run 900 ) 901 902 if self._task_run.is_toolcall_pending: 903 yield FinishEvent( 904 messageMetadata=FinishMessageMetadata(finishReason="tool-calls"), 905 ) 906 else: 907 for ai_event in converter.finalize(): 908 yield ai_event 909 finally: 910 if is_root_agent: 911 try: 912 run_id = get_agent_run_id() 913 if run_id: 914 await MCPSessionManager.shared().cleanup_session(run_id) 915 finally: 916 clear_agent_run_id()
79@dataclass 80class AdapterConfig: 81 """ 82 An adapter config is config options that do NOT impact the output of the model. 83 84 For example: if it's saved, of if we request additional data like logprobs. 85 """ 86 87 allow_saving: bool = True 88 top_logprobs: int | None = None 89 default_tags: list[str] | None = None 90 91 """ 92 A custom prompt builder can be injected to override the system prompt building process. 93 If not provided, the prompt builder will be created from the run_config.prompt_id which 94 may load additional files from disk. 95 """ 96 prompt_builder: BasePromptBuilder | None = None 97 98 """ 99 Pre-loaded skills keyed by skill ID. When the run config references skills, 100 they are looked up from this dict instead of reading from the filesystem. 101 Use load_skills_for_task() to build this dict. 102 """ 103 skills: SkillsDict | None = None 104 105 """ 106 When True, the adapter will stop and return control to the caller when a tool call 107 is invoked, instead of processing tool calls internally. Default is False (process 108 tool calls internally). 109 """ 110 return_on_tool_call: bool = False 111 112 """ 113 Extra tools provided directly by the caller, in addition to tools resolved from the 114 task's tool registry. These are sent to the model together with registry tools, and 115 their names must not collide with registry tool names. 116 117 If ``return_on_tool_call`` is False (the default), the adapter executes these tools 118 itself just like registry tools. If True, the adapter returns as soon as the model 119 requests a tool call and the caller is responsible for running the tool and passing 120 results back via ``prior_trace``. 121 """ 122 unmanaged_tools: list[KilnToolInterface] | None = None 123 124 """ 125 When True, automatically inject prompt caching hints into completion 126 requests. This is a cost optimization and does not affect model output. 127 """ 128 automatic_prompt_caching: bool = False
An adapter config is config options that do NOT impact the output of the model.
For example: if it's saved, of if we request additional data like logprobs.
Pre-loaded skills keyed by skill ID. When the run config references skills, they are looked up from this dict instead of reading from the filesystem. Use load_skills_for_task() to build this dict.
When True, the adapter will stop and return control to the caller when a tool call is invoked, instead of processing tool calls internally. Default is False (process tool calls internally).
Extra tools provided directly by the caller, in addition to tools resolved from the task's tool registry. These are sent to the model together with registry tools, and their names must not collide with registry tool names.
If return_on_tool_call is False (the default), the adapter executes these tools
itself just like registry tools. If True, the adapter returns as soon as the model
requests a tool call and the caller is responsible for running the tool and passing
results back via prior_trace.
131class BaseAdapter(metaclass=ABCMeta): 132 """Base class for AI model adapters that handle task execution. 133 134 This abstract class provides the foundation for implementing model-specific adapters 135 that can process tasks with structured or unstructured inputs/outputs. It handles 136 input/output validation, prompt building, and run tracking. 137 138 Prompt building is handled internally by the adapter, which uses a prompt builder 139 based on the run config. To override the prompt building behavior, pass a custom prompt 140 builder to the adapter config. 141 """ 142 143 def __init__( 144 self, 145 task: Task, 146 run_config: RunConfigProperties, 147 config: AdapterConfig | None = None, 148 ): 149 self.task = task 150 self.run_config: RunConfigProperties = run_config 151 self.base_adapter_config = config or AdapterConfig() 152 153 if isinstance(run_config, KilnAgentRunConfigProperties): 154 self.update_run_config_unknown_structured_output_mode() 155 self.prompt_builder = ( 156 self.base_adapter_config.prompt_builder 157 or prompt_builder_from_id(run_config.prompt_id, task) 158 ) 159 else: 160 self.prompt_builder = None 161 self._model_provider: KilnModelProvider | None = None 162 self._resolved_skills: list[Skill] | None = None 163 164 self.output_schema = task.output_json_schema 165 self.input_schema = task.input_json_schema 166 167 def model_provider(self) -> KilnModelProvider: 168 """ 169 Lazy load the model provider for this adapter. 170 """ 171 if self._model_provider is not None: 172 return self._model_provider 173 run_config = as_kiln_agent_run_config(self.run_config) 174 if not run_config.model_name or not run_config.model_provider_name: 175 raise ValueError("model_name and model_provider_name must be provided") 176 self._model_provider = kiln_model_provider_from( 177 run_config.model_name, run_config.model_provider_name 178 ) 179 if not self._model_provider: 180 raise ValueError( 181 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 182 ) 183 return self._model_provider 184 185 @staticmethod 186 def _normalize_prior_trace( 187 prior_trace: list[ChatCompletionMessageParam] | None, 188 ) -> list[ChatCompletionMessageParam] | None: 189 if not prior_trace: 190 return None 191 return prior_trace 192 193 def _reject_multiturn_with_structured_input( 194 self, 195 prior_trace: list[ChatCompletionMessageParam] | None, 196 ) -> None: 197 if prior_trace is not None and self.input_schema is not None: 198 raise ValueError( 199 "Cannot run multiturn execution with a task that has a structured input schema. " 200 "Use an unstructured task, or call without prior_trace." 201 ) 202 203 async def invoke( 204 self, 205 input: InputType, 206 input_source: DataSource | None = None, 207 prior_trace: list[ChatCompletionMessageParam] | None = None, 208 parent_task_run: TaskRun | None = None, 209 ) -> TaskRun: 210 task_run, _ = await self.invoke_returning_run_output( 211 input, input_source, prior_trace, parent_task_run 212 ) 213 return task_run 214 215 async def _run_returning_run_output( 216 self, 217 input: InputType, 218 input_source: DataSource | None = None, 219 prior_trace: list[ChatCompletionMessageParam] | None = None, 220 parent_task_run: TaskRun | None = None, 221 ) -> Tuple[TaskRun, RunOutput]: 222 prior_trace = self._normalize_prior_trace(prior_trace) 223 self._reject_multiturn_with_structured_input(prior_trace) 224 225 # validate input, allowing arrays 226 if self.input_schema is not None: 227 validate_schema_with_value_error( 228 input, 229 self.input_schema, 230 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 231 require_object=False, 232 ) 233 234 # Format model input for model call (we save the original input in the task without formatting) 235 formatted_input = input 236 formatter_id = self.model_provider().formatter 237 if formatter_id is not None: 238 formatter = request_formatter_from_id(formatter_id) 239 formatted_input = formatter.format_input(input) 240 241 # Run 242 run_output, usage = await self._run(formatted_input, prior_trace=prior_trace) 243 244 if not run_output.is_toolcall_pending: 245 # Normal completion: parse and validate output 246 provider = self.model_provider() 247 parser = model_parser_from_id(provider.parser) 248 parsed_output = parser.parse_output(original_output=run_output) 249 250 # validate output 251 if self.output_schema is not None: 252 # Parse json to dict if we have structured output 253 if isinstance(parsed_output.output, str): 254 parsed_output.output = parse_json_string(parsed_output.output) 255 256 if not isinstance(parsed_output.output, dict): 257 raise RuntimeError( 258 f"structured response is not a dict: {parsed_output.output}" 259 ) 260 validate_schema_with_value_error( 261 parsed_output.output, 262 self.output_schema, 263 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 264 ) 265 else: 266 if not isinstance(parsed_output.output, str): 267 raise RuntimeError( 268 f"response is not a string for non-structured task: {parsed_output.output}" 269 ) 270 271 trace_has_toolcalls = parsed_output.trace is not None and any( 272 message.get("role", None) == "tool" for message in parsed_output.trace 273 ) 274 275 # Validate reasoning content is present and required 276 # We don't require reasoning when using tools as models tend not to return any on the final turn (both Sonnet and Gemini). 277 if ( 278 provider.reasoning_capable 279 and ( 280 not parsed_output.intermediate_outputs 281 or "reasoning" not in parsed_output.intermediate_outputs 282 ) 283 and not ( 284 provider.reasoning_optional_for_structured_output 285 and self.has_structured_output() 286 ) 287 and not trace_has_toolcalls 288 ): 289 raise RuntimeError( 290 "Reasoning is required for this model, but no reasoning was returned." 291 ) 292 293 run_output = parsed_output 294 295 run = self.generate_run( 296 input, input_source, run_output, usage, run_output.trace, parent_task_run 297 ) 298 299 # Save the run if configured to do so, and we have a path to save to 300 if ( 301 self.base_adapter_config.allow_saving 302 and Config.shared().autosave_runs 303 and self.task.path is not None 304 ): 305 run.save_to_file() 306 else: 307 # Clear the ID to indicate it's not persisted 308 run.id = None 309 310 return run, run_output 311 312 async def invoke_returning_run_output( 313 self, 314 input: InputType, 315 input_source: DataSource | None = None, 316 prior_trace: list[ChatCompletionMessageParam] | None = None, 317 parent_task_run: TaskRun | None = None, 318 ) -> Tuple[TaskRun, RunOutput]: 319 # Determine if this is the root agent (no existing run context) 320 is_root_agent = get_agent_run_id() is None 321 322 if is_root_agent: 323 run_id = generate_agent_run_id() 324 set_agent_run_id(run_id) 325 326 try: 327 return await self._run_returning_run_output( 328 input, input_source, prior_trace, parent_task_run 329 ) 330 finally: 331 if is_root_agent: 332 try: 333 run_id = get_agent_run_id() 334 if run_id: 335 await MCPSessionManager.shared().cleanup_session(run_id) 336 finally: 337 clear_agent_run_id() 338 339 def invoke_openai_stream( 340 self, 341 input: InputType, 342 input_source: DataSource | None = None, 343 prior_trace: list[ChatCompletionMessageParam] | None = None, 344 parent_task_run: TaskRun | None = None, 345 ) -> OpenAIStreamResult: 346 """Stream raw OpenAI-protocol chunks for the task execution. 347 348 Returns an async-iterable that yields ``ModelResponseStream`` chunks 349 as they arrive from the model. After the iterator is exhausted the 350 run has been validated and saved (when configured). The resulting 351 ``TaskRun`` is available via the ``.task_run`` property. 352 353 Tool-call rounds happen internally and are not surfaced; use 354 ``invoke_ai_sdk_stream`` if you need tool-call events. 355 """ 356 return OpenAIStreamResult( 357 self, input, input_source, prior_trace, parent_task_run 358 ) 359 360 def invoke_ai_sdk_stream( 361 self, 362 input: InputType, 363 input_source: DataSource | None = None, 364 prior_trace: list[ChatCompletionMessageParam] | None = None, 365 parent_task_run: TaskRun | None = None, 366 ) -> AiSdkStreamResult: 367 """Stream AI SDK protocol events for the task execution. 368 369 Returns an async-iterable that yields ``AiSdkStreamEvent`` instances 370 covering text, reasoning, tool-call lifecycle, step boundaries, and 371 control events. After the iterator is exhausted the resulting 372 ``TaskRun`` is available via the ``.task_run`` property. 373 """ 374 return AiSdkStreamResult( 375 self, input, input_source, prior_trace, parent_task_run 376 ) 377 378 def _prepare_stream( 379 self, 380 input: InputType, 381 prior_trace: list[ChatCompletionMessageParam] | None, 382 ) -> AdapterStream: 383 prior_trace = self._normalize_prior_trace(prior_trace) 384 self._reject_multiturn_with_structured_input(prior_trace) 385 386 if self.input_schema is not None: 387 validate_schema_with_value_error( 388 input, 389 self.input_schema, 390 "This task requires a specific input schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 391 require_object=False, 392 ) 393 394 formatted_input = input 395 formatter_id = self.model_provider().formatter 396 if formatter_id is not None: 397 formatter = request_formatter_from_id(formatter_id) 398 formatted_input = formatter.format_input(input) 399 400 return self._create_run_stream(formatted_input, prior_trace) 401 402 def _finalize_stream( 403 self, 404 adapter_stream: AdapterStream, 405 input: InputType, 406 input_source: DataSource | None, 407 parent_task_run: TaskRun | None = None, 408 ) -> TaskRun: 409 """Streaming invocations are only concerned with passing through events as they come in. 410 At the end of the stream, we still need to validate the output, create a run and everything 411 else that a non-streaming invocation would do. 412 """ 413 414 result: AdapterStreamResult = adapter_stream.result 415 run_output = result.run_output 416 usage = result.usage 417 418 if not run_output.is_toolcall_pending: 419 # Normal completion: parse and validate output 420 provider = self.model_provider() 421 parser = model_parser_from_id(provider.parser) 422 parsed_output = parser.parse_output(original_output=run_output) 423 424 if self.output_schema is not None: 425 if isinstance(parsed_output.output, str): 426 parsed_output.output = parse_json_string(parsed_output.output) 427 if not isinstance(parsed_output.output, dict): 428 raise RuntimeError( 429 f"structured response is not a dict: {parsed_output.output}" 430 ) 431 validate_schema_with_value_error( 432 parsed_output.output, 433 self.output_schema, 434 "This task requires a specific output schema. While the model produced JSON, that JSON didn't meet the schema. Search 'Troubleshooting Structured Data Issues' in our docs for more information.", 435 ) 436 else: 437 if not isinstance(parsed_output.output, str): 438 raise RuntimeError( 439 f"response is not a string for non-structured task: {parsed_output.output}" 440 ) 441 442 trace_has_toolcalls = parsed_output.trace is not None and any( 443 message.get("role", None) == "tool" for message in parsed_output.trace 444 ) 445 if ( 446 provider.reasoning_capable 447 and ( 448 not parsed_output.intermediate_outputs 449 or "reasoning" not in parsed_output.intermediate_outputs 450 ) 451 and not ( 452 provider.reasoning_optional_for_structured_output 453 and self.has_structured_output() 454 ) 455 and not trace_has_toolcalls 456 ): 457 raise RuntimeError( 458 "Reasoning is required for this model, but no reasoning was returned." 459 ) 460 461 run_output = parsed_output 462 463 run = self.generate_run( 464 input, input_source, run_output, usage, run_output.trace, parent_task_run 465 ) 466 467 if ( 468 self.base_adapter_config.allow_saving 469 and Config.shared().autosave_runs 470 and self.task.path is not None 471 ): 472 run.save_to_file() 473 else: 474 run.id = None 475 476 return run 477 478 def has_structured_output(self) -> bool: 479 return self.output_schema is not None 480 481 @abstractmethod 482 def adapter_name(self) -> str: 483 pass 484 485 @abstractmethod 486 async def _run( 487 self, 488 input: InputType, 489 prior_trace: list[ChatCompletionMessageParam] | None = None, 490 ) -> Tuple[RunOutput, Usage | None]: 491 pass 492 493 def _create_run_stream( 494 self, 495 input: InputType, 496 prior_trace: list[ChatCompletionMessageParam] | None = None, 497 ) -> AdapterStream: 498 """Create a stream for the adapter. Implementations must override this method to support streaming.""" 499 raise NotImplementedError("Streaming is not supported for this adapter type") 500 501 def build_prompt(self) -> str: 502 if self.prompt_builder is None: 503 raise ValueError("Prompt builder is not available for MCP run config") 504 # The prompt builder needs to know if we want to inject formatting instructions 505 structured_output_mode = as_kiln_agent_run_config( 506 self.run_config 507 ).structured_output_mode 508 add_json_instructions = self.has_structured_output() and ( 509 structured_output_mode == StructuredOutputMode.json_instructions 510 or structured_output_mode 511 == StructuredOutputMode.json_instruction_and_object 512 ) 513 514 return self.prompt_builder.build_prompt( 515 include_json_instructions=add_json_instructions, 516 skills=self._resolve_skills(), 517 ) 518 519 def _resolve_skills(self) -> list[Skill]: 520 """Resolve skills from the injected skills dict. 521 522 Uses the pre-loaded skills dict from AdapterConfig. Caches the result 523 so that build_prompt and available_tools don't repeat 524 the lookup. Raises ValueError if the run config references a skill 525 that is not in the injected dict. 526 """ 527 if self._resolved_skills is not None: 528 return self._resolved_skills 529 530 if self.run_config.type != "kiln_agent": 531 self._resolved_skills = [] 532 return self._resolved_skills 533 534 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 535 if tool_config is None or tool_config.tools is None: 536 self._resolved_skills = [] 537 return self._resolved_skills 538 539 skill_tool_ids = [ 540 tid for tid in tool_config.tools if tid.startswith(SKILL_TOOL_ID_PREFIX) 541 ] 542 if not skill_tool_ids: 543 self._resolved_skills = [] 544 return self._resolved_skills 545 546 injected = self.base_adapter_config.skills 547 if injected is None: 548 raise ValueError( 549 "Run config references skills but no skills dict was provided via " 550 "AdapterConfig(skills=...). Use load_skills_for_task() to pre-load " 551 "skills and pass them to the adapter." 552 ) 553 554 skills: list[Skill] = [] 555 seen: set[str] = set() 556 for tool_id in skill_tool_ids: 557 sid = skill_id_from_tool_id(tool_id) 558 if sid not in injected: 559 raise ValueError( 560 f"Skill {sid} referenced in run config but not found in the " 561 "injected skills dict." 562 ) 563 if sid in seen: 564 continue 565 seen.add(sid) 566 skills.append(injected[sid]) 567 568 self._resolved_skills = skills 569 return self._resolved_skills 570 571 def build_chat_formatter( 572 self, 573 input: InputType, 574 prior_trace: list[ChatCompletionMessageParam] | None = None, 575 ) -> ChatFormatter: 576 prior_trace = self._normalize_prior_trace(prior_trace) 577 self._reject_multiturn_with_structured_input(prior_trace) 578 if prior_trace is not None: 579 return MultiturnFormatter(prior_trace, input) 580 if self.prompt_builder is None: 581 raise ValueError("Prompt builder is not available for MCP run config") 582 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 583 584 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 585 system_message = self.build_prompt() 586 587 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 588 if not cot_prompt: 589 return get_chat_formatter( 590 strategy=ChatStrategy.single_turn, 591 system_message=system_message, 592 user_input=input, 593 ) 594 595 # Some models like finetunes are trained with a specific chat strategy. Use that. 596 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 597 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 598 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 599 return get_chat_formatter( 600 strategy=tuned_chat_strategy, 601 system_message=system_message, 602 user_input=input, 603 thinking_instructions=cot_prompt, 604 ) 605 606 # Pick the best chat strategy for the model given it has a cot prompt. 607 reasoning_capable = self.model_provider().reasoning_capable 608 if reasoning_capable: 609 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 610 # A simple message with the COT prompt appended to the message list is sufficient 611 return get_chat_formatter( 612 strategy=ChatStrategy.single_turn_r1_thinking, 613 system_message=system_message, 614 user_input=input, 615 thinking_instructions=cot_prompt, 616 ) 617 else: 618 # Unstructured output with COT 619 # Two calls to separate the thinking from the final response 620 return get_chat_formatter( 621 strategy=ChatStrategy.two_message_cot, 622 system_message=system_message, 623 user_input=input, 624 thinking_instructions=cot_prompt, 625 ) 626 627 # create a run and task output 628 def generate_run( 629 self, 630 input: InputType, 631 input_source: DataSource | None, 632 run_output: RunOutput, 633 usage: Usage | None = None, 634 trace: list[ChatCompletionMessageParam] | None = None, 635 parent_task_run: TaskRun | None = None, 636 ) -> TaskRun: 637 output_str = ( 638 json.dumps(run_output.output, ensure_ascii=False) 639 if isinstance(run_output.output, dict) 640 else run_output.output 641 ) 642 643 output_source_type = ( 644 DataSourceType.tool_call 645 if self.run_config.type == "mcp" 646 else DataSourceType.synthetic 647 ) 648 649 new_output = TaskOutput( 650 output=output_str, 651 source=DataSource( 652 type=output_source_type, 653 properties=self._properties_for_task_output(), 654 run_config=self.run_config, 655 ), 656 ) 657 658 # Convert input and output to JSON strings if they aren't strings 659 input_str = ( 660 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 661 ) 662 663 if input_source is None: 664 input_source = DataSource( 665 type=DataSourceType.human, 666 properties={"created_by": Config.shared().user_id}, 667 ) 668 669 parent_task_run_id: str | None = None 670 if parent_task_run is not None: 671 if parent_task_run.id is None: 672 raise ValueError( 673 "parent_task_run must be persisted before using as parent: save the parent " 674 "TaskRun (e.g. save_to_file()) so it has a stable id." 675 ) 676 parent_task_run_id = parent_task_run.id 677 678 return TaskRun( 679 parent=self.task, 680 parent_task_run_id=parent_task_run_id, 681 input=input_str, 682 input_source=input_source, 683 output=new_output, 684 intermediate_outputs=run_output.intermediate_outputs, 685 tags=self.base_adapter_config.default_tags or [], 686 usage=usage, 687 trace=trace, 688 cumulative_usage=MessageUsage.from_trace(trace), 689 ) 690 691 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 692 match self.run_config.type: 693 case "mcp": 694 return {} 695 case "kiln_agent": 696 if not isinstance(self.run_config, KilnAgentRunConfigProperties): 697 raise ValueError("Kiln agent run config is required") 698 run_config = self.run_config 699 700 props: Dict[str, str | int | float] = {} 701 props["adapter_name"] = self.adapter_name() 702 # Legacy properties where we save the run_config details into custom properties. 703 # These are now also be saved in the run_config field. 704 props["model_name"] = run_config.model_name 705 props["model_provider"] = run_config.model_provider_name 706 props["prompt_id"] = run_config.prompt_id 707 props["structured_output_mode"] = run_config.structured_output_mode 708 props["temperature"] = run_config.temperature 709 props["top_p"] = run_config.top_p 710 711 return props 712 case _: 713 raise_exhaustive_enum_error(self.run_config.type) 714 715 def update_run_config_unknown_structured_output_mode(self) -> None: 716 if self.run_config.type != "kiln_agent": 717 return 718 run_config = as_kiln_agent_run_config(self.run_config) 719 structured_output_mode = run_config.structured_output_mode 720 721 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 722 # Look up our recommended mode from ml_model_list if we have one 723 if structured_output_mode == StructuredOutputMode.unknown: 724 new_run_config = run_config.model_copy(deep=True) 725 structured_output_mode = default_structured_output_mode_for_model_provider( 726 run_config.model_name, 727 run_config.model_provider_name, 728 ) 729 new_run_config.structured_output_mode = structured_output_mode 730 self.run_config = new_run_config 731 732 async def available_tools(self) -> list[KilnToolInterface]: 733 if self.run_config.type != "kiln_agent": 734 return [] 735 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 736 if tool_config is None or tool_config.tools is None: 737 return [] 738 739 non_skill_tool_ids = [ 740 tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX) 741 ] 742 743 tools: list[KilnToolInterface] = [ 744 tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids 745 ] 746 747 skills = self._resolve_skills() 748 if skills: 749 seen_names: set[str] = set() 750 for skill in skills: 751 if skill.name in seen_names: 752 raise ValueError( 753 f"Duplicate skill name '{skill.name}'. Each skill must have a unique name." 754 ) 755 seen_names.add(skill.name) 756 tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills)) 757 758 tool_names = [await tool.name() for tool in tools] 759 if len(tool_names) != len(set(tool_names)): 760 raise ValueError( 761 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 762 ) 763 764 return tools
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
Prompt building is handled internally by the adapter, which uses a prompt builder based on the run config. To override the prompt building behavior, pass a custom prompt builder to the adapter config.
167 def model_provider(self) -> KilnModelProvider: 168 """ 169 Lazy load the model provider for this adapter. 170 """ 171 if self._model_provider is not None: 172 return self._model_provider 173 run_config = as_kiln_agent_run_config(self.run_config) 174 if not run_config.model_name or not run_config.model_provider_name: 175 raise ValueError("model_name and model_provider_name must be provided") 176 self._model_provider = kiln_model_provider_from( 177 run_config.model_name, run_config.model_provider_name 178 ) 179 if not self._model_provider: 180 raise ValueError( 181 f"model_provider_name {run_config.model_provider_name} not found for model {run_config.model_name}" 182 ) 183 return self._model_provider
Lazy load the model provider for this adapter.
203 async def invoke( 204 self, 205 input: InputType, 206 input_source: DataSource | None = None, 207 prior_trace: list[ChatCompletionMessageParam] | None = None, 208 parent_task_run: TaskRun | None = None, 209 ) -> TaskRun: 210 task_run, _ = await self.invoke_returning_run_output( 211 input, input_source, prior_trace, parent_task_run 212 ) 213 return task_run
312 async def invoke_returning_run_output( 313 self, 314 input: InputType, 315 input_source: DataSource | None = None, 316 prior_trace: list[ChatCompletionMessageParam] | None = None, 317 parent_task_run: TaskRun | None = None, 318 ) -> Tuple[TaskRun, RunOutput]: 319 # Determine if this is the root agent (no existing run context) 320 is_root_agent = get_agent_run_id() is None 321 322 if is_root_agent: 323 run_id = generate_agent_run_id() 324 set_agent_run_id(run_id) 325 326 try: 327 return await self._run_returning_run_output( 328 input, input_source, prior_trace, parent_task_run 329 ) 330 finally: 331 if is_root_agent: 332 try: 333 run_id = get_agent_run_id() 334 if run_id: 335 await MCPSessionManager.shared().cleanup_session(run_id) 336 finally: 337 clear_agent_run_id()
339 def invoke_openai_stream( 340 self, 341 input: InputType, 342 input_source: DataSource | None = None, 343 prior_trace: list[ChatCompletionMessageParam] | None = None, 344 parent_task_run: TaskRun | None = None, 345 ) -> OpenAIStreamResult: 346 """Stream raw OpenAI-protocol chunks for the task execution. 347 348 Returns an async-iterable that yields ``ModelResponseStream`` chunks 349 as they arrive from the model. After the iterator is exhausted the 350 run has been validated and saved (when configured). The resulting 351 ``TaskRun`` is available via the ``.task_run`` property. 352 353 Tool-call rounds happen internally and are not surfaced; use 354 ``invoke_ai_sdk_stream`` if you need tool-call events. 355 """ 356 return OpenAIStreamResult( 357 self, input, input_source, prior_trace, parent_task_run 358 )
Stream raw OpenAI-protocol chunks for the task execution.
Returns an async-iterable that yields ModelResponseStream chunks
as they arrive from the model. After the iterator is exhausted the
run has been validated and saved (when configured). The resulting
TaskRun is available via the .task_run property.
Tool-call rounds happen internally and are not surfaced; use
invoke_ai_sdk_stream if you need tool-call events.
360 def invoke_ai_sdk_stream( 361 self, 362 input: InputType, 363 input_source: DataSource | None = None, 364 prior_trace: list[ChatCompletionMessageParam] | None = None, 365 parent_task_run: TaskRun | None = None, 366 ) -> AiSdkStreamResult: 367 """Stream AI SDK protocol events for the task execution. 368 369 Returns an async-iterable that yields ``AiSdkStreamEvent`` instances 370 covering text, reasoning, tool-call lifecycle, step boundaries, and 371 control events. After the iterator is exhausted the resulting 372 ``TaskRun`` is available via the ``.task_run`` property. 373 """ 374 return AiSdkStreamResult( 375 self, input, input_source, prior_trace, parent_task_run 376 )
Stream AI SDK protocol events for the task execution.
Returns an async-iterable that yields AiSdkStreamEvent instances
covering text, reasoning, tool-call lifecycle, step boundaries, and
control events. After the iterator is exhausted the resulting
TaskRun is available via the .task_run property.
501 def build_prompt(self) -> str: 502 if self.prompt_builder is None: 503 raise ValueError("Prompt builder is not available for MCP run config") 504 # The prompt builder needs to know if we want to inject formatting instructions 505 structured_output_mode = as_kiln_agent_run_config( 506 self.run_config 507 ).structured_output_mode 508 add_json_instructions = self.has_structured_output() and ( 509 structured_output_mode == StructuredOutputMode.json_instructions 510 or structured_output_mode 511 == StructuredOutputMode.json_instruction_and_object 512 ) 513 514 return self.prompt_builder.build_prompt( 515 include_json_instructions=add_json_instructions, 516 skills=self._resolve_skills(), 517 )
571 def build_chat_formatter( 572 self, 573 input: InputType, 574 prior_trace: list[ChatCompletionMessageParam] | None = None, 575 ) -> ChatFormatter: 576 prior_trace = self._normalize_prior_trace(prior_trace) 577 self._reject_multiturn_with_structured_input(prior_trace) 578 if prior_trace is not None: 579 return MultiturnFormatter(prior_trace, input) 580 if self.prompt_builder is None: 581 raise ValueError("Prompt builder is not available for MCP run config") 582 # Determine the chat strategy to use based on the prompt the user selected, the model's capabilities, and if the model was finetuned with a specific chat strategy. 583 584 cot_prompt = self.prompt_builder.chain_of_thought_prompt() 585 system_message = self.build_prompt() 586 587 # If no COT prompt, use the single turn strategy. Even when a tuned strategy is set, as the tuned strategy is either already single turn, or won't work without a COT prompt. 588 if not cot_prompt: 589 return get_chat_formatter( 590 strategy=ChatStrategy.single_turn, 591 system_message=system_message, 592 user_input=input, 593 ) 594 595 # Some models like finetunes are trained with a specific chat strategy. Use that. 596 # However, don't use that if it is single turn. The user selected a COT prompt, and we give explicit prompt selection priority over the tuned strategy. 597 tuned_chat_strategy = self.model_provider().tuned_chat_strategy 598 if tuned_chat_strategy and tuned_chat_strategy != ChatStrategy.single_turn: 599 return get_chat_formatter( 600 strategy=tuned_chat_strategy, 601 system_message=system_message, 602 user_input=input, 603 thinking_instructions=cot_prompt, 604 ) 605 606 # Pick the best chat strategy for the model given it has a cot prompt. 607 reasoning_capable = self.model_provider().reasoning_capable 608 if reasoning_capable: 609 # "Thinking" LLM designed to output thinking in a structured format. We'll use it's native format. 610 # A simple message with the COT prompt appended to the message list is sufficient 611 return get_chat_formatter( 612 strategy=ChatStrategy.single_turn_r1_thinking, 613 system_message=system_message, 614 user_input=input, 615 thinking_instructions=cot_prompt, 616 ) 617 else: 618 # Unstructured output with COT 619 # Two calls to separate the thinking from the final response 620 return get_chat_formatter( 621 strategy=ChatStrategy.two_message_cot, 622 system_message=system_message, 623 user_input=input, 624 thinking_instructions=cot_prompt, 625 )
628 def generate_run( 629 self, 630 input: InputType, 631 input_source: DataSource | None, 632 run_output: RunOutput, 633 usage: Usage | None = None, 634 trace: list[ChatCompletionMessageParam] | None = None, 635 parent_task_run: TaskRun | None = None, 636 ) -> TaskRun: 637 output_str = ( 638 json.dumps(run_output.output, ensure_ascii=False) 639 if isinstance(run_output.output, dict) 640 else run_output.output 641 ) 642 643 output_source_type = ( 644 DataSourceType.tool_call 645 if self.run_config.type == "mcp" 646 else DataSourceType.synthetic 647 ) 648 649 new_output = TaskOutput( 650 output=output_str, 651 source=DataSource( 652 type=output_source_type, 653 properties=self._properties_for_task_output(), 654 run_config=self.run_config, 655 ), 656 ) 657 658 # Convert input and output to JSON strings if they aren't strings 659 input_str = ( 660 input if isinstance(input, str) else json.dumps(input, ensure_ascii=False) 661 ) 662 663 if input_source is None: 664 input_source = DataSource( 665 type=DataSourceType.human, 666 properties={"created_by": Config.shared().user_id}, 667 ) 668 669 parent_task_run_id: str | None = None 670 if parent_task_run is not None: 671 if parent_task_run.id is None: 672 raise ValueError( 673 "parent_task_run must be persisted before using as parent: save the parent " 674 "TaskRun (e.g. save_to_file()) so it has a stable id." 675 ) 676 parent_task_run_id = parent_task_run.id 677 678 return TaskRun( 679 parent=self.task, 680 parent_task_run_id=parent_task_run_id, 681 input=input_str, 682 input_source=input_source, 683 output=new_output, 684 intermediate_outputs=run_output.intermediate_outputs, 685 tags=self.base_adapter_config.default_tags or [], 686 usage=usage, 687 trace=trace, 688 cumulative_usage=MessageUsage.from_trace(trace), 689 )
715 def update_run_config_unknown_structured_output_mode(self) -> None: 716 if self.run_config.type != "kiln_agent": 717 return 718 run_config = as_kiln_agent_run_config(self.run_config) 719 structured_output_mode = run_config.structured_output_mode 720 721 # Old datamodels didn't save the structured output mode. Some clients (tests, end users) might not set it. 722 # Look up our recommended mode from ml_model_list if we have one 723 if structured_output_mode == StructuredOutputMode.unknown: 724 new_run_config = run_config.model_copy(deep=True) 725 structured_output_mode = default_structured_output_mode_for_model_provider( 726 run_config.model_name, 727 run_config.model_provider_name, 728 ) 729 new_run_config.structured_output_mode = structured_output_mode 730 self.run_config = new_run_config
732 async def available_tools(self) -> list[KilnToolInterface]: 733 if self.run_config.type != "kiln_agent": 734 return [] 735 tool_config = as_kiln_agent_run_config(self.run_config).tools_config 736 if tool_config is None or tool_config.tools is None: 737 return [] 738 739 non_skill_tool_ids = [ 740 tid for tid in tool_config.tools if not tid.startswith(SKILL_TOOL_ID_PREFIX) 741 ] 742 743 tools: list[KilnToolInterface] = [ 744 tool_from_id(tool_id, self.task) for tool_id in non_skill_tool_ids 745 ] 746 747 skills = self._resolve_skills() 748 if skills: 749 seen_names: set[str] = set() 750 for skill in skills: 751 if skill.name in seen_names: 752 raise ValueError( 753 f"Duplicate skill name '{skill.name}'. Each skill must have a unique name." 754 ) 755 seen_names.add(skill.name) 756 tools.append(SkillTool(f"{SKILL_TOOL_ID_PREFIX}_combined", skills)) 757 758 tool_names = [await tool.name() for tool in tools] 759 if len(tool_names) != len(set(tool_names)): 760 raise ValueError( 761 "Each tool must have a unique name. Either de-select the duplicate tools, or modify their names to describe their unique purpose. Model will struggle if tools do not have descriptive names and tool execution will be undefined." 762 ) 763 764 return tools
767class OpenAIStreamResult: 768 """Async-iterable wrapper around the OpenAI streaming flow. 769 770 Yields ``ModelResponseStream`` chunks. After iteration the resulting 771 ``TaskRun`` is available via the ``.task_run`` property. 772 773 When return_on_tool_call=True and the model requests tool calls, the stream 774 will stop and ``task_run.is_toolcall_pending`` will be True. 775 """ 776 777 def __init__( 778 self, 779 adapter: BaseAdapter, 780 input: InputType, 781 input_source: DataSource | None, 782 prior_trace: list[ChatCompletionMessageParam] | None, 783 parent_task_run: TaskRun | None = None, 784 ) -> None: 785 self._adapter = adapter 786 self._input = input 787 self._input_source = input_source 788 self._prior_trace = prior_trace 789 self._parent_task_run = parent_task_run 790 self._task_run: TaskRun | None = None 791 792 @property 793 def task_run(self) -> TaskRun: 794 if self._task_run is None: 795 raise RuntimeError( 796 "Stream has not been fully consumed yet. " 797 "Iterate over the stream before accessing .task_run" 798 ) 799 return self._task_run 800 801 async def __aiter__(self) -> AsyncIterator[ModelResponseStream]: 802 self._task_run = None 803 is_root_agent = get_agent_run_id() is None 804 if is_root_agent: 805 set_agent_run_id(generate_agent_run_id()) 806 807 try: 808 adapter_stream = self._adapter._prepare_stream( 809 self._input, self._prior_trace 810 ) 811 812 async for event in adapter_stream: 813 if isinstance(event, ModelResponseStream): 814 yield event 815 816 self._task_run = self._adapter._finalize_stream( 817 adapter_stream, self._input, self._input_source, self._parent_task_run 818 ) 819 finally: 820 if is_root_agent: 821 try: 822 run_id = get_agent_run_id() 823 if run_id: 824 await MCPSessionManager.shared().cleanup_session(run_id) 825 finally: 826 clear_agent_run_id()
Async-iterable wrapper around the OpenAI streaming flow.
Yields ModelResponseStream chunks. After iteration the resulting
TaskRun is available via the .task_run property.
When return_on_tool_call=True and the model requests tool calls, the stream
will stop and task_run.is_toolcall_pending will be True.
777 def __init__( 778 self, 779 adapter: BaseAdapter, 780 input: InputType, 781 input_source: DataSource | None, 782 prior_trace: list[ChatCompletionMessageParam] | None, 783 parent_task_run: TaskRun | None = None, 784 ) -> None: 785 self._adapter = adapter 786 self._input = input 787 self._input_source = input_source 788 self._prior_trace = prior_trace 789 self._parent_task_run = parent_task_run 790 self._task_run: TaskRun | None = None
829class AiSdkStreamResult: 830 """Async-iterable wrapper around the AI SDK streaming flow. 831 832 Yields ``AiSdkStreamEvent`` instances. After iteration the resulting 833 ``TaskRun`` is available via the ``.task_run`` property. 834 835 When return_on_tool_call=True and the model requests tool calls, the FINISH 836 event will have finishReason: "tool-calls" and ``task_run.is_toolcall_pending`` 837 will be True. 838 """ 839 840 def __init__( 841 self, 842 adapter: BaseAdapter, 843 input: InputType, 844 input_source: DataSource | None, 845 prior_trace: list[ChatCompletionMessageParam] | None, 846 parent_task_run: TaskRun | None = None, 847 ) -> None: 848 self._adapter = adapter 849 self._input = input 850 self._input_source = input_source 851 self._prior_trace = prior_trace 852 self._parent_task_run = parent_task_run 853 self._task_run: TaskRun | None = None 854 855 @property 856 def task_run(self) -> TaskRun: 857 if self._task_run is None: 858 raise RuntimeError( 859 "Stream has not been fully consumed yet. " 860 "Iterate over the stream before accessing .task_run" 861 ) 862 return self._task_run 863 864 async def __aiter__(self) -> AsyncIterator[AiSdkStreamEvent]: 865 self._task_run = None 866 is_root_agent = get_agent_run_id() is None 867 if is_root_agent: 868 set_agent_run_id(generate_agent_run_id()) 869 870 try: 871 adapter_stream = self._adapter._prepare_stream( 872 self._input, self._prior_trace 873 ) 874 875 message_id = f"msg-{uuid.uuid4().hex}" 876 converter = AiSdkStreamConverter() 877 878 yield StartEvent(messageId=message_id) 879 yield StartStepEvent() 880 881 last_event_was_tool_call = False 882 async for event in adapter_stream: 883 if isinstance(event, ModelResponseStream): 884 if last_event_was_tool_call: 885 converter.reset_for_next_step() 886 last_event_was_tool_call = False 887 for ai_event in converter.convert_chunk(event): 888 yield ai_event 889 elif isinstance(event, ToolCallEvent): 890 last_event_was_tool_call = True 891 for ai_event in converter.convert_tool_event(event): 892 yield ai_event 893 894 for ai_event in converter.close_open_blocks(): 895 yield ai_event 896 897 yield FinishStepEvent() 898 899 self._task_run = self._adapter._finalize_stream( 900 adapter_stream, self._input, self._input_source, self._parent_task_run 901 ) 902 903 if self._task_run.is_toolcall_pending: 904 yield FinishEvent( 905 messageMetadata=FinishMessageMetadata(finishReason="tool-calls"), 906 ) 907 else: 908 for ai_event in converter.finalize(): 909 yield ai_event 910 finally: 911 if is_root_agent: 912 try: 913 run_id = get_agent_run_id() 914 if run_id: 915 await MCPSessionManager.shared().cleanup_session(run_id) 916 finally: 917 clear_agent_run_id()
Async-iterable wrapper around the AI SDK streaming flow.
Yields AiSdkStreamEvent instances. After iteration the resulting
TaskRun is available via the .task_run property.
When return_on_tool_call=True and the model requests tool calls, the FINISH
event will have finishReason: "tool-calls" and task_run.is_toolcall_pending
will be True.
840 def __init__( 841 self, 842 adapter: BaseAdapter, 843 input: InputType, 844 input_source: DataSource | None, 845 prior_trace: list[ChatCompletionMessageParam] | None, 846 parent_task_run: TaskRun | None = None, 847 ) -> None: 848 self._adapter = adapter 849 self._input = input 850 self._input_source = input_source 851 self._prior_trace = prior_trace 852 self._parent_task_run = parent_task_run 853 self._task_run: TaskRun | None = None