kiln_ai.adapters.base_adapter
1import json 2from abc import ABCMeta, abstractmethod 3from dataclasses import dataclass 4from typing import Dict 5 6from kiln_ai.datamodel import ( 7 DataSource, 8 DataSourceType, 9 Task, 10 TaskOutput, 11 TaskRun, 12) 13from kiln_ai.datamodel.json_schema import validate_schema 14from kiln_ai.utils.config import Config 15 16from .prompt_builders import BasePromptBuilder, SimplePromptBuilder 17 18 19@dataclass 20class AdapterInfo: 21 adapter_name: str 22 model_name: str 23 model_provider: str 24 prompt_builder_name: str 25 26 27@dataclass 28class RunOutput: 29 output: Dict | str 30 intermediate_outputs: Dict[str, str] | None 31 32 33class BaseAdapter(metaclass=ABCMeta): 34 """Base class for AI model adapters that handle task execution. 35 36 This abstract class provides the foundation for implementing model-specific adapters 37 that can process tasks with structured or unstructured inputs/outputs. It handles 38 input/output validation, prompt building, and run tracking. 39 40 Attributes: 41 prompt_builder (BasePromptBuilder): Builder for constructing prompts for the model 42 kiln_task (Task): The task configuration and metadata 43 output_schema (dict | None): JSON schema for validating structured outputs 44 input_schema (dict | None): JSON schema for validating structured inputs 45 """ 46 47 def __init__( 48 self, 49 kiln_task: Task, 50 prompt_builder: BasePromptBuilder | None = None, 51 tags: list[str] | None = None, 52 ): 53 self.prompt_builder = prompt_builder or SimplePromptBuilder(kiln_task) 54 self.kiln_task = kiln_task 55 self.output_schema = self.kiln_task.output_json_schema 56 self.input_schema = self.kiln_task.input_json_schema 57 self.default_tags = tags 58 59 async def invoke_returning_raw( 60 self, 61 input: Dict | str, 62 input_source: DataSource | None = None, 63 ) -> Dict | str: 64 result = await self.invoke(input, input_source) 65 if self.kiln_task.output_json_schema is None: 66 return result.output.output 67 else: 68 return json.loads(result.output.output) 69 70 async def invoke( 71 self, 72 input: Dict | str, 73 input_source: DataSource | None = None, 74 ) -> TaskRun: 75 # validate input 76 if self.input_schema is not None: 77 if not isinstance(input, dict): 78 raise ValueError(f"structured input is not a dict: {input}") 79 validate_schema(input, self.input_schema) 80 81 # Run 82 run_output = await self._run(input) 83 84 # validate output 85 if self.output_schema is not None: 86 if not isinstance(run_output.output, dict): 87 raise RuntimeError( 88 f"structured response is not a dict: {run_output.output}" 89 ) 90 validate_schema(run_output.output, self.output_schema) 91 else: 92 if not isinstance(run_output.output, str): 93 raise RuntimeError( 94 f"response is not a string for non-structured task: {run_output.output}" 95 ) 96 97 # Generate the run and output 98 run = self.generate_run(input, input_source, run_output) 99 100 # Save the run if configured to do so, and we have a path to save to 101 if Config.shared().autosave_runs and self.kiln_task.path is not None: 102 run.save_to_file() 103 else: 104 # Clear the ID to indicate it's not persisted 105 run.id = None 106 107 return run 108 109 def has_structured_output(self) -> bool: 110 return self.output_schema is not None 111 112 @abstractmethod 113 def adapter_info(self) -> AdapterInfo: 114 pass 115 116 @abstractmethod 117 async def _run(self, input: Dict | str) -> RunOutput: 118 pass 119 120 def build_prompt(self) -> str: 121 return self.prompt_builder.build_prompt() 122 123 # create a run and task output 124 def generate_run( 125 self, input: Dict | str, input_source: DataSource | None, run_output: RunOutput 126 ) -> TaskRun: 127 # Convert input and output to JSON strings if they are dictionaries 128 input_str = json.dumps(input) if isinstance(input, dict) else input 129 output_str = ( 130 json.dumps(run_output.output) 131 if isinstance(run_output.output, dict) 132 else run_output.output 133 ) 134 135 # If no input source is provided, use the human data source 136 if input_source is None: 137 input_source = DataSource( 138 type=DataSourceType.human, 139 properties={"created_by": Config.shared().user_id}, 140 ) 141 142 new_task_run = TaskRun( 143 parent=self.kiln_task, 144 input=input_str, 145 input_source=input_source, 146 output=TaskOutput( 147 output=output_str, 148 # Synthetic since an adapter, not a human, is creating this 149 source=DataSource( 150 type=DataSourceType.synthetic, 151 properties=self._properties_for_task_output(), 152 ), 153 ), 154 intermediate_outputs=run_output.intermediate_outputs, 155 tags=self.default_tags or [], 156 ) 157 158 exclude_fields = { 159 "id": True, 160 "created_at": True, 161 "updated_at": True, 162 "path": True, 163 "output": {"id": True, "created_at": True, "updated_at": True}, 164 } 165 new_run_dump = new_task_run.model_dump(exclude=exclude_fields) 166 167 # Check if the same run already exists 168 existing_task_run = next( 169 ( 170 task_run 171 for task_run in self.kiln_task.runs() 172 if task_run.model_dump(exclude=exclude_fields) == new_run_dump 173 ), 174 None, 175 ) 176 if existing_task_run: 177 return existing_task_run 178 179 return new_task_run 180 181 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 182 props = {} 183 184 # adapter info 185 adapter_info = self.adapter_info() 186 props["adapter_name"] = adapter_info.adapter_name 187 props["model_name"] = adapter_info.model_name 188 props["model_provider"] = adapter_info.model_provider 189 props["prompt_builder_name"] = adapter_info.prompt_builder_name 190 191 return props
@dataclass
class
AdapterInfo:
@dataclass
class
RunOutput:
class
BaseAdapter:
34class BaseAdapter(metaclass=ABCMeta): 35 """Base class for AI model adapters that handle task execution. 36 37 This abstract class provides the foundation for implementing model-specific adapters 38 that can process tasks with structured or unstructured inputs/outputs. It handles 39 input/output validation, prompt building, and run tracking. 40 41 Attributes: 42 prompt_builder (BasePromptBuilder): Builder for constructing prompts for the model 43 kiln_task (Task): The task configuration and metadata 44 output_schema (dict | None): JSON schema for validating structured outputs 45 input_schema (dict | None): JSON schema for validating structured inputs 46 """ 47 48 def __init__( 49 self, 50 kiln_task: Task, 51 prompt_builder: BasePromptBuilder | None = None, 52 tags: list[str] | None = None, 53 ): 54 self.prompt_builder = prompt_builder or SimplePromptBuilder(kiln_task) 55 self.kiln_task = kiln_task 56 self.output_schema = self.kiln_task.output_json_schema 57 self.input_schema = self.kiln_task.input_json_schema 58 self.default_tags = tags 59 60 async def invoke_returning_raw( 61 self, 62 input: Dict | str, 63 input_source: DataSource | None = None, 64 ) -> Dict | str: 65 result = await self.invoke(input, input_source) 66 if self.kiln_task.output_json_schema is None: 67 return result.output.output 68 else: 69 return json.loads(result.output.output) 70 71 async def invoke( 72 self, 73 input: Dict | str, 74 input_source: DataSource | None = None, 75 ) -> TaskRun: 76 # validate input 77 if self.input_schema is not None: 78 if not isinstance(input, dict): 79 raise ValueError(f"structured input is not a dict: {input}") 80 validate_schema(input, self.input_schema) 81 82 # Run 83 run_output = await self._run(input) 84 85 # validate output 86 if self.output_schema is not None: 87 if not isinstance(run_output.output, dict): 88 raise RuntimeError( 89 f"structured response is not a dict: {run_output.output}" 90 ) 91 validate_schema(run_output.output, self.output_schema) 92 else: 93 if not isinstance(run_output.output, str): 94 raise RuntimeError( 95 f"response is not a string for non-structured task: {run_output.output}" 96 ) 97 98 # Generate the run and output 99 run = self.generate_run(input, input_source, run_output) 100 101 # Save the run if configured to do so, and we have a path to save to 102 if Config.shared().autosave_runs and self.kiln_task.path is not None: 103 run.save_to_file() 104 else: 105 # Clear the ID to indicate it's not persisted 106 run.id = None 107 108 return run 109 110 def has_structured_output(self) -> bool: 111 return self.output_schema is not None 112 113 @abstractmethod 114 def adapter_info(self) -> AdapterInfo: 115 pass 116 117 @abstractmethod 118 async def _run(self, input: Dict | str) -> RunOutput: 119 pass 120 121 def build_prompt(self) -> str: 122 return self.prompt_builder.build_prompt() 123 124 # create a run and task output 125 def generate_run( 126 self, input: Dict | str, input_source: DataSource | None, run_output: RunOutput 127 ) -> TaskRun: 128 # Convert input and output to JSON strings if they are dictionaries 129 input_str = json.dumps(input) if isinstance(input, dict) else input 130 output_str = ( 131 json.dumps(run_output.output) 132 if isinstance(run_output.output, dict) 133 else run_output.output 134 ) 135 136 # If no input source is provided, use the human data source 137 if input_source is None: 138 input_source = DataSource( 139 type=DataSourceType.human, 140 properties={"created_by": Config.shared().user_id}, 141 ) 142 143 new_task_run = TaskRun( 144 parent=self.kiln_task, 145 input=input_str, 146 input_source=input_source, 147 output=TaskOutput( 148 output=output_str, 149 # Synthetic since an adapter, not a human, is creating this 150 source=DataSource( 151 type=DataSourceType.synthetic, 152 properties=self._properties_for_task_output(), 153 ), 154 ), 155 intermediate_outputs=run_output.intermediate_outputs, 156 tags=self.default_tags or [], 157 ) 158 159 exclude_fields = { 160 "id": True, 161 "created_at": True, 162 "updated_at": True, 163 "path": True, 164 "output": {"id": True, "created_at": True, "updated_at": True}, 165 } 166 new_run_dump = new_task_run.model_dump(exclude=exclude_fields) 167 168 # Check if the same run already exists 169 existing_task_run = next( 170 ( 171 task_run 172 for task_run in self.kiln_task.runs() 173 if task_run.model_dump(exclude=exclude_fields) == new_run_dump 174 ), 175 None, 176 ) 177 if existing_task_run: 178 return existing_task_run 179 180 return new_task_run 181 182 def _properties_for_task_output(self) -> Dict[str, str | int | float]: 183 props = {} 184 185 # adapter info 186 adapter_info = self.adapter_info() 187 props["adapter_name"] = adapter_info.adapter_name 188 props["model_name"] = adapter_info.model_name 189 props["model_provider"] = adapter_info.model_provider 190 props["prompt_builder_name"] = adapter_info.prompt_builder_name 191 192 return props
Base class for AI model adapters that handle task execution.
This abstract class provides the foundation for implementing model-specific adapters that can process tasks with structured or unstructured inputs/outputs. It handles input/output validation, prompt building, and run tracking.
Attributes: prompt_builder (BasePromptBuilder): Builder for constructing prompts for the model kiln_task (Task): The task configuration and metadata output_schema (dict | None): JSON schema for validating structured outputs input_schema (dict | None): JSON schema for validating structured inputs
async def
invoke_returning_raw( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> Union[Dict, str]:
60 async def invoke_returning_raw( 61 self, 62 input: Dict | str, 63 input_source: DataSource | None = None, 64 ) -> Dict | str: 65 result = await self.invoke(input, input_source) 66 if self.kiln_task.output_json_schema is None: 67 return result.output.output 68 else: 69 return json.loads(result.output.output)
async def
invoke( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None = None) -> kiln_ai.datamodel.TaskRun:
71 async def invoke( 72 self, 73 input: Dict | str, 74 input_source: DataSource | None = None, 75 ) -> TaskRun: 76 # validate input 77 if self.input_schema is not None: 78 if not isinstance(input, dict): 79 raise ValueError(f"structured input is not a dict: {input}") 80 validate_schema(input, self.input_schema) 81 82 # Run 83 run_output = await self._run(input) 84 85 # validate output 86 if self.output_schema is not None: 87 if not isinstance(run_output.output, dict): 88 raise RuntimeError( 89 f"structured response is not a dict: {run_output.output}" 90 ) 91 validate_schema(run_output.output, self.output_schema) 92 else: 93 if not isinstance(run_output.output, str): 94 raise RuntimeError( 95 f"response is not a string for non-structured task: {run_output.output}" 96 ) 97 98 # Generate the run and output 99 run = self.generate_run(input, input_source, run_output) 100 101 # Save the run if configured to do so, and we have a path to save to 102 if Config.shared().autosave_runs and self.kiln_task.path is not None: 103 run.save_to_file() 104 else: 105 # Clear the ID to indicate it's not persisted 106 run.id = None 107 108 return run
def
generate_run( self, input: Union[Dict, str], input_source: kiln_ai.datamodel.DataSource | None, run_output: RunOutput) -> kiln_ai.datamodel.TaskRun:
125 def generate_run( 126 self, input: Dict | str, input_source: DataSource | None, run_output: RunOutput 127 ) -> TaskRun: 128 # Convert input and output to JSON strings if they are dictionaries 129 input_str = json.dumps(input) if isinstance(input, dict) else input 130 output_str = ( 131 json.dumps(run_output.output) 132 if isinstance(run_output.output, dict) 133 else run_output.output 134 ) 135 136 # If no input source is provided, use the human data source 137 if input_source is None: 138 input_source = DataSource( 139 type=DataSourceType.human, 140 properties={"created_by": Config.shared().user_id}, 141 ) 142 143 new_task_run = TaskRun( 144 parent=self.kiln_task, 145 input=input_str, 146 input_source=input_source, 147 output=TaskOutput( 148 output=output_str, 149 # Synthetic since an adapter, not a human, is creating this 150 source=DataSource( 151 type=DataSourceType.synthetic, 152 properties=self._properties_for_task_output(), 153 ), 154 ), 155 intermediate_outputs=run_output.intermediate_outputs, 156 tags=self.default_tags or [], 157 ) 158 159 exclude_fields = { 160 "id": True, 161 "created_at": True, 162 "updated_at": True, 163 "path": True, 164 "output": {"id": True, "created_at": True, "updated_at": True}, 165 } 166 new_run_dump = new_task_run.model_dump(exclude=exclude_fields) 167 168 # Check if the same run already exists 169 existing_task_run = next( 170 ( 171 task_run 172 for task_run in self.kiln_task.runs() 173 if task_run.model_dump(exclude=exclude_fields) == new_run_dump 174 ), 175 None, 176 ) 177 if existing_task_run: 178 return existing_task_run 179 180 return new_task_run