kiln_ai.adapters.base_adapter

  1import json
  2from abc import ABCMeta, abstractmethod
  3from dataclasses import dataclass
  4from typing import Dict
  5
  6from kiln_ai.datamodel import (
  7    DataSource,
  8    DataSourceType,
  9    Task,
 10    TaskOutput,
 11    TaskRun,
 12)
 13from kiln_ai.datamodel.json_schema import validate_schema
 14from kiln_ai.utils.config import Config
 15
 16from .prompt_builders import BasePromptBuilder, SimplePromptBuilder
 17
 18
 19@dataclass
 20class AdapterInfo:
 21    adapter_name: str
 22    model_name: str
 23    model_provider: str
 24    prompt_builder_name: str
 25
 26
 27@dataclass
 28class RunOutput:
 29    output: Dict | str
 30    intermediate_outputs: Dict[str, str] | None
 31
 32
 33class BaseAdapter(metaclass=ABCMeta):
 34    """Base class for AI model adapters that handle task execution.
 35
 36    This abstract class provides the foundation for implementing model-specific adapters
 37    that can process tasks with structured or unstructured inputs/outputs. It handles
 38    input/output validation, prompt building, and run tracking.
 39
 40    Attributes:
 41        prompt_builder (BasePromptBuilder): Builder for constructing prompts for the model
 42        kiln_task (Task): The task configuration and metadata
 43        output_schema (dict | None): JSON schema for validating structured outputs
 44        input_schema (dict | None): JSON schema for validating structured inputs
 45    """
 46
 47    def __init__(
 48        self,
 49        kiln_task: Task,
 50        prompt_builder: BasePromptBuilder | None = None,
 51        tags: list[str] | None = None,
 52    ):
 53        self.prompt_builder = prompt_builder or SimplePromptBuilder(kiln_task)
 54        self.kiln_task = kiln_task
 55        self.output_schema = self.kiln_task.output_json_schema
 56        self.input_schema = self.kiln_task.input_json_schema
 57        self.default_tags = tags
 58
 59    async def invoke_returning_raw(
 60        self,
 61        input: Dict | str,
 62        input_source: DataSource | None = None,
 63    ) -> Dict | str:
 64        result = await self.invoke(input, input_source)
 65        if self.kiln_task.output_json_schema is None:
 66            return result.output.output
 67        else:
 68            return json.loads(result.output.output)
 69
 70    async def invoke(
 71        self,
 72        input: Dict | str,
 73        input_source: DataSource | None = None,
 74    ) -> TaskRun:
 75        # validate input
 76        if self.input_schema is not None:
 77            if not isinstance(input, dict):
 78                raise ValueError(f"structured input is not a dict: {input}")
 79            validate_schema(input, self.input_schema)
 80
 81        # Run
 82        run_output = await self._run(input)
 83
 84        # validate output
 85        if self.output_schema is not None:
 86            if not isinstance(run_output.output, dict):
 87                raise RuntimeError(
 88                    f"structured response is not a dict: {run_output.output}"
 89                )
 90            validate_schema(run_output.output, self.output_schema)
 91        else:
 92            if not isinstance(run_output.output, str):
 93                raise RuntimeError(
 94                    f"response is not a string for non-structured task: {run_output.output}"
 95                )
 96
 97        # Generate the run and output
 98        run = self.generate_run(input, input_source, run_output)
 99
100        # Save the run if configured to do so, and we have a path to save to
101        if Config.shared().autosave_runs and self.kiln_task.path is not None:
102            run.save_to_file()
103        else:
104            # Clear the ID to indicate it's not persisted
105            run.id = None
106
107        return run
108
109    def has_structured_output(self) -> bool:
110        return self.output_schema is not None
111
112    @abstractmethod
113    def adapter_info(self) -> AdapterInfo:
114        pass
115
116    @abstractmethod
117    async def _run(self, input: Dict | str) -> RunOutput:
118        pass
119
120    def build_prompt(self) -> str:
121        return self.prompt_builder.build_prompt()
122
123    # create a run and task output
124    def generate_run(
125        self, input: Dict | str, input_source: DataSource | None, run_output: RunOutput
126    ) -> TaskRun:
127        # Convert input and output to JSON strings if they are dictionaries
128        input_str = json.dumps(input) if isinstance(input, dict) else input
129        output_str = (
130            json.dumps(run_output.output)
131            if isinstance(run_output.output, dict)
132            else run_output.output
133        )
134
135        # If no input source is provided, use the human data source
136        if input_source is None:
137            input_source = DataSource(
138                type=DataSourceType.human,
139                properties={"created_by": Config.shared().user_id},
140            )
141
142        new_task_run = TaskRun(
143            parent=self.kiln_task,
144            input=input_str,
145            input_source=input_source,
146            output=TaskOutput(
147                output=output_str,
148                # Synthetic since an adapter, not a human, is creating this
149                source=DataSource(
150                    type=DataSourceType.synthetic,
151                    properties=self._properties_for_task_output(),
152                ),
153            ),
154            intermediate_outputs=run_output.intermediate_outputs,
155            tags=self.default_tags or [],
156        )
157
158        exclude_fields = {
159            "id": True,
160            "created_at": True,
161            "updated_at": True,
162            "path": True,
163            "output": {"id": True, "created_at": True, "updated_at": True},
164        }
165        new_run_dump = new_task_run.model_dump(exclude=exclude_fields)
166
167        # Check if the same run already exists
168        existing_task_run = next(
169            (
170                task_run
171                for task_run in self.kiln_task.runs()
172                if task_run.model_dump(exclude=exclude_fields) == new_run_dump
173            ),
174            None,
175        )
176        if existing_task_run:
177            return existing_task_run
178
179        return new_task_run
180
181    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
182        props = {}
183
184        # adapter info
185        adapter_info = self.adapter_info()
186        props["adapter_name"] = adapter_info.adapter_name
187        props["model_name"] = adapter_info.model_name
188        props["model_provider"] = adapter_info.model_provider
189        props["prompt_builder_name"] = adapter_info.prompt_builder_name
190
191        return props
@dataclass
class AdapterInfo:
20@dataclass
21class AdapterInfo:
22    adapter_name: str
23    model_name: str
24    model_provider: str
25    prompt_builder_name: str
AdapterInfo( adapter_name: str, model_name: str, model_provider: str, prompt_builder_name: str)
adapter_name: str
model_name: str
model_provider: str
prompt_builder_name: str
@dataclass
class RunOutput:
28@dataclass
29class RunOutput:
30    output: Dict | str
31    intermediate_outputs: Dict[str, str] | None
RunOutput( output: Union[Dict, str], intermediate_outputs: Optional[Dict[str, str]])
output: Union[Dict, str]
intermediate_outputs: Optional[Dict[str, str]]
class BaseAdapter:
 34class BaseAdapter(metaclass=ABCMeta):
 35    """Base class for AI model adapters that handle task execution.
 36
 37    This abstract class provides the foundation for implementing model-specific adapters
 38    that can process tasks with structured or unstructured inputs/outputs. It handles
 39    input/output validation, prompt building, and run tracking.
 40
 41    Attributes:
 42        prompt_builder (BasePromptBuilder): Builder for constructing prompts for the model
 43        kiln_task (Task): The task configuration and metadata
 44        output_schema (dict | None): JSON schema for validating structured outputs
 45        input_schema (dict | None): JSON schema for validating structured inputs
 46    """
 47
 48    def __init__(
 49        self,
 50        kiln_task: Task,
 51        prompt_builder: BasePromptBuilder | None = None,
 52        tags: list[str] | None = None,
 53    ):
 54        self.prompt_builder = prompt_builder or SimplePromptBuilder(kiln_task)
 55        self.kiln_task = kiln_task
 56        self.output_schema = self.kiln_task.output_json_schema
 57        self.input_schema = self.kiln_task.input_json_schema
 58        self.default_tags = tags
 59
 60    async def invoke_returning_raw(
 61        self,
 62        input: Dict | str,
 63        input_source: DataSource | None = None,
 64    ) -> Dict | str:
 65        result = await self.invoke(input, input_source)
 66        if self.kiln_task.output_json_schema is None:
 67            return result.output.output
 68        else:
 69            return json.loads(result.output.output)
 70
 71    async def invoke(
 72        self,
 73        input: Dict | str,
 74        input_source: DataSource | None = None,
 75    ) -> TaskRun:
 76        # validate input
 77        if self.input_schema is not None:
 78            if not isinstance(input, dict):
 79                raise ValueError(f"structured input is not a dict: {input}")
 80            validate_schema(input, self.input_schema)
 81
 82        # Run
 83        run_output = await self._run(input)
 84
 85        # validate output
 86        if self.output_schema is not None:
 87            if not isinstance(run_output.output, dict):
 88                raise RuntimeError(
 89                    f"structured response is not a dict: {run_output.output}"
 90                )
 91            validate_schema(run_output.output, self.output_schema)
 92        else:
 93            if not isinstance(run_output.output, str):
 94                raise RuntimeError(
 95                    f"response is not a string for non-structured task: {run_output.output}"
 96                )
 97
 98        # Generate the run and output
 99        run = self.generate_run(input, input_source, run_output)
100
101        # Save the run if configured to do so, and we have a path to save to
102        if Config.shared().autosave_runs and self.kiln_task.path is not None:
103            run.save_to_file()
104        else:
105            # Clear the ID to indicate it's not persisted
106            run.id = None
107
108        return run
109
110    def has_structured_output(self) -> bool:
111        return self.output_schema is not None
112
113    @abstractmethod
114    def adapter_info(self) -> AdapterInfo:
115        pass
116
117    @abstractmethod
118    async def _run(self, input: Dict | str) -> RunOutput:
119        pass
120
121    def build_prompt(self) -> str:
122        return self.prompt_builder.build_prompt()
123
124    # create a run and task output
125    def generate_run(
126        self, input: Dict | str, input_source: DataSource | None, run_output: RunOutput
127    ) -> TaskRun:
128        # Convert input and output to JSON strings if they are dictionaries
129        input_str = json.dumps(input) if isinstance(input, dict) else input
130        output_str = (
131            json.dumps(run_output.output)
132            if isinstance(run_output.output, dict)
133            else run_output.output
134        )
135
136        # If no input source is provided, use the human data source
137        if input_source is None:
138            input_source = DataSource(
139                type=DataSourceType.human,
140                properties={"created_by": Config.shared().user_id},
141            )
142
143        new_task_run = TaskRun(
144            parent=self.kiln_task,
145            input=input_str,
146            input_source=input_source,
147            output=TaskOutput(
148                output=output_str,
149                # Synthetic since an adapter, not a human, is creating this
150                source=DataSource(
151                    type=DataSourceType.synthetic,
152                    properties=self._properties_for_task_output(),
153                ),
154            ),
155            intermediate_outputs=run_output.intermediate_outputs,
156            tags=self.default_tags or [],
157        )
158
159        exclude_fields = {
160            "id": True,
161            "created_at": True,
162            "updated_at": True,
163            "path": True,
164            "output": {"id": True, "created_at": True, "updated_at": True},
165        }
166        new_run_dump = new_task_run.model_dump(exclude=exclude_fields)
167
168        # Check if the same run already exists
169        existing_task_run = next(
170            (
171                task_run
172                for task_run in self.kiln_task.runs()
173                if task_run.model_dump(exclude=exclude_fields) == new_run_dump
174            ),
175            None,
176        )
177        if existing_task_run:
178            return existing_task_run
179
180        return new_task_run
181
182    def _properties_for_task_output(self) -> Dict[str, str | int | float]:
183        props = {}
184
185        # adapter info
186        adapter_info = self.adapter_info()
187        props["adapter_name"] = adapter_info.adapter_name
188        props["model_name"] = adapter_info.model_name
189        props["model_provider"] = adapter_info.model_provider
190        props["prompt_builder_name"] = adapter_info.prompt_builder_name
191
192        return props

Base class for AI model adapters that handle task execution.

This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.

Attributes: prompt_builder (BasePromptBuilder): Builder for constructing prompts for the model kiln_task (Task): The task configuration and metadata output_schema (dict | None): JSON schema for validating structured outputs input_schema (dict | None): JSON schema for validating structured inputs

prompt_builder
kiln_task
output_schema
input_schema
default_tags
async def invoke_returning_raw( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Union[Dict, str]:
60    async def invoke_returning_raw(
61        self,
62        input: Dict | str,
63        input_source: DataSource | None = None,
64    ) -> Dict | str:
65        result = await self.invoke(input, input_source)
66        if self.kiln_task.output_json_schema is None:
67            return result.output.output
68        else:
69            return json.loads(result.output.output)
async def invoke( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
 71    async def invoke(
 72        self,
 73        input: Dict | str,
 74        input_source: DataSource | None = None,
 75    ) -> TaskRun:
 76        # validate input
 77        if self.input_schema is not None:
 78            if not isinstance(input, dict):
 79                raise ValueError(f"structured input is not a dict: {input}")
 80            validate_schema(input, self.input_schema)
 81
 82        # Run
 83        run_output = await self._run(input)
 84
 85        # validate output
 86        if self.output_schema is not None:
 87            if not isinstance(run_output.output, dict):
 88                raise RuntimeError(
 89                    f"structured response is not a dict: {run_output.output}"
 90                )
 91            validate_schema(run_output.output, self.output_schema)
 92        else:
 93            if not isinstance(run_output.output, str):
 94                raise RuntimeError(
 95                    f"response is not a string for non-structured task: {run_output.output}"
 96                )
 97
 98        # Generate the run and output
 99        run = self.generate_run(input, input_source, run_output)
100
101        # Save the run if configured to do so, and we have a path to save to
102        if Config.shared().autosave_runs and self.kiln_task.path is not None:
103            run.save_to_file()
104        else:
105            # Clear the ID to indicate it's not persisted
106            run.id = None
107
108        return run
def has_structured_output(self) -> bool:
110    def has_structured_output(self) -> bool:
111        return self.output_schema is not None
@abstractmethod
def adapter_info(self) -> AdapterInfo:
113    @abstractmethod
114    def adapter_info(self) -> AdapterInfo:
115        pass
def build_prompt(self) -> str:
121    def build_prompt(self) -> str:
122        return self.prompt_builder.build_prompt()
def generate_run( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None, run_output: RunOutput) -> kiln_ai.datamodel.TaskRun:
125    def generate_run(
126        self, input: Dict | str, input_source: DataSource | None, run_output: RunOutput
127    ) -> TaskRun:
128        # Convert input and output to JSON strings if they are dictionaries
129        input_str = json.dumps(input) if isinstance(input, dict) else input
130        output_str = (
131            json.dumps(run_output.output)
132            if isinstance(run_output.output, dict)
133            else run_output.output
134        )
135
136        # If no input source is provided, use the human data source
137        if input_source is None:
138            input_source = DataSource(
139                type=DataSourceType.human,
140                properties={"created_by": Config.shared().user_id},
141            )
142
143        new_task_run = TaskRun(
144            parent=self.kiln_task,
145            input=input_str,
146            input_source=input_source,
147            output=TaskOutput(
148                output=output_str,
149                # Synthetic since an adapter, not a human, is creating this
150                source=DataSource(
151                    type=DataSourceType.synthetic,
152                    properties=self._properties_for_task_output(),
153                ),
154            ),
155            intermediate_outputs=run_output.intermediate_outputs,
156            tags=self.default_tags or [],
157        )
158
159        exclude_fields = {
160            "id": True,
161            "created_at": True,
162            "updated_at": True,
163            "path": True,
164            "output": {"id": True, "created_at": True, "updated_at": True},
165        }
166        new_run_dump = new_task_run.model_dump(exclude=exclude_fields)
167
168        # Check if the same run already exists
169        existing_task_run = next(
170            (
171                task_run
172                for task_run in self.kiln_task.runs()
173                if task_run.model_dump(exclude=exclude_fields) == new_run_dump
174            ),
175            None,
176        )
177        if existing_task_run:
178            return existing_task_run
179
180        return new_task_run