kiln_ai.adapters.fine_tune.openai_finetune
1import time 2 3import openai 4from openai.types.fine_tuning import FineTuningJob 5 6from kiln_ai.adapters.fine_tune.base_finetune import ( 7 BaseFinetuneAdapter, 8 FineTuneParameter, 9 FineTuneStatus, 10 FineTuneStatusType, 11) 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter 13from kiln_ai.datamodel import DatasetSplit, StructuredOutputMode, Task 14from kiln_ai.utils.config import Config 15 16 17def _get_openai_client(): 18 key = Config.shared().open_ai_api_key 19 if not key: 20 raise RuntimeError( 21 "OpenAI API key not set. You must connect OpenAI in settings." 22 ) 23 return openai.AsyncOpenAI( 24 api_key=key, 25 ) 26 27 28class OpenAIFinetune(BaseFinetuneAdapter): 29 """ 30 A fine-tuning adapter for OpenAI. 31 """ 32 33 async def status(self) -> FineTuneStatus: 34 """ 35 Get the status of the fine-tune. 36 """ 37 38 # Update the datamodel with the latest status if it has changed 39 status = await self._status() 40 if status.status != self.datamodel.latest_status: 41 self.datamodel.latest_status = status.status 42 if self.datamodel.path: 43 self.datamodel.save_to_file() 44 return status 45 46 async def _status(self) -> FineTuneStatus: 47 if not self.datamodel or not self.datamodel.provider_id: 48 return FineTuneStatus( 49 status=FineTuneStatusType.pending, 50 message="This fine-tune has not been started or has not been assigned a provider ID.", 51 ) 52 53 try: 54 # Will raise an error if the job is not found, or for other issues 55 oai_client = _get_openai_client() 56 response = await oai_client.fine_tuning.jobs.retrieve( 57 self.datamodel.provider_id 58 ) 59 60 # If the fine-tuned model has been updated, update the datamodel 61 try: 62 if ( 63 self.datamodel.fine_tune_model_id != response.fine_tuned_model 64 or self.datamodel.base_model_id != response.model 65 ): 66 self.datamodel.fine_tune_model_id = response.fine_tuned_model 67 self.datamodel.base_model_id = response.model 68 self.datamodel.save_to_file() 69 except Exception: 70 # Don't let this error crash the status call 71 pass 72 73 except openai.APIConnectionError: 74 return FineTuneStatus( 75 status=FineTuneStatusType.unknown, message="Server connection error" 76 ) 77 except openai.RateLimitError: 78 return FineTuneStatus( 79 status=FineTuneStatusType.unknown, 80 message="Rate limit exceeded. Could not fetch fine-tune status.", 81 ) 82 except openai.APIStatusError as e: 83 if e.status_code == 404: 84 return FineTuneStatus( 85 status=FineTuneStatusType.unknown, 86 message="Job with this ID not found. It may have been deleted.", 87 ) 88 return FineTuneStatus( 89 status=FineTuneStatusType.unknown, 90 message=f"Unknown error: [{e!s}]", 91 ) 92 93 if not response or not isinstance(response, FineTuningJob): 94 return FineTuneStatus( 95 status=FineTuneStatusType.unknown, 96 message="Invalid response from OpenAI", 97 ) 98 if response.error and response.error.code: 99 return FineTuneStatus( 100 status=FineTuneStatusType.failed, 101 message=f"{response.error.message} [Code: {response.error.code}]", 102 ) 103 status = response.status 104 if status == "failed": 105 return FineTuneStatus( 106 status=FineTuneStatusType.failed, 107 message="Job failed - unknown reason", 108 ) 109 if status == "cancelled": 110 return FineTuneStatus( 111 status=FineTuneStatusType.failed, message="Job cancelled" 112 ) 113 if status in ["validating_files", "running", "queued"]: 114 time_to_finish_msg: str | None = None 115 if response.estimated_finish is not None: 116 time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds." 117 return FineTuneStatus( 118 status=FineTuneStatusType.running, 119 message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}", 120 ) 121 if status == "succeeded": 122 return FineTuneStatus( 123 status=FineTuneStatusType.completed, message="Training job completed" 124 ) 125 return FineTuneStatus( 126 status=FineTuneStatusType.unknown, 127 message=f"Unknown status: [{status}]", 128 ) 129 130 async def _start(self, dataset: DatasetSplit) -> None: 131 task = self.datamodel.parent_task() 132 if not task: 133 raise ValueError("Task is required to start a fine-tune") 134 135 # Use chat format for unstructured output, and JSON for formatted output (was previously function calls) 136 format = DatasetFormat.OPENAI_CHAT_JSONL 137 if task.output_json_schema: 138 format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL 139 if self.datamodel.run_config is not None: 140 self.datamodel.run_config.structured_output_mode = ( 141 StructuredOutputMode.json_schema 142 ) 143 train_file_id = await self.generate_and_upload_jsonl( 144 dataset, self.datamodel.train_split_name, task, format 145 ) 146 validation_file_id = None 147 if self.datamodel.validation_split_name: 148 validation_file_id = await self.generate_and_upload_jsonl( 149 dataset, self.datamodel.validation_split_name, task, format 150 ) 151 152 # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API) 153 hyperparameters = { 154 k: v 155 for k, v in self.datamodel.parameters.items() 156 if k in ["n_epochs", "learning_rate_multiplier", "batch_size"] 157 } 158 159 oai_client = _get_openai_client() 160 ft = await oai_client.fine_tuning.jobs.create( 161 training_file=train_file_id, 162 model=self.datamodel.base_model_id, 163 validation_file=validation_file_id, 164 seed=self.datamodel.parameters.get("seed"), # type: ignore 165 hyperparameters=hyperparameters, # type: ignore 166 suffix=f"kiln_ai.{self.datamodel.id}", 167 ) 168 self.datamodel.provider_id = ft.id 169 self.datamodel.fine_tune_model_id = ft.fine_tuned_model 170 # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel 171 self.datamodel.base_model_id = ft.model 172 173 return None 174 175 async def generate_and_upload_jsonl( 176 self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat 177 ) -> str: 178 formatter = DatasetFormatter( 179 dataset, self.datamodel.system_message, self.datamodel.thinking_instructions 180 ) 181 path = await formatter.dump_to_file( 182 split_name, format, self.datamodel.data_strategy 183 ) 184 185 oai_client = _get_openai_client() 186 response = await oai_client.files.create( 187 file=open(path, "rb"), 188 purpose="fine-tune", 189 ) 190 id = response.id 191 if not id: 192 raise ValueError("Failed to upload file to OpenAI") 193 return id 194 195 @classmethod 196 def available_parameters(cls) -> list[FineTuneParameter]: 197 return [ 198 FineTuneParameter( 199 name="batch_size", 200 type="int", 201 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 202 ), 203 FineTuneParameter( 204 name="learning_rate_multiplier", 205 type="float", 206 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 207 optional=True, 208 ), 209 FineTuneParameter( 210 name="n_epochs", 211 type="int", 212 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 213 optional=True, 214 ), 215 FineTuneParameter( 216 name="seed", 217 type="int", 218 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 219 optional=True, 220 ), 221 ]
29class OpenAIFinetune(BaseFinetuneAdapter): 30 """ 31 A fine-tuning adapter for OpenAI. 32 """ 33 34 async def status(self) -> FineTuneStatus: 35 """ 36 Get the status of the fine-tune. 37 """ 38 39 # Update the datamodel with the latest status if it has changed 40 status = await self._status() 41 if status.status != self.datamodel.latest_status: 42 self.datamodel.latest_status = status.status 43 if self.datamodel.path: 44 self.datamodel.save_to_file() 45 return status 46 47 async def _status(self) -> FineTuneStatus: 48 if not self.datamodel or not self.datamodel.provider_id: 49 return FineTuneStatus( 50 status=FineTuneStatusType.pending, 51 message="This fine-tune has not been started or has not been assigned a provider ID.", 52 ) 53 54 try: 55 # Will raise an error if the job is not found, or for other issues 56 oai_client = _get_openai_client() 57 response = await oai_client.fine_tuning.jobs.retrieve( 58 self.datamodel.provider_id 59 ) 60 61 # If the fine-tuned model has been updated, update the datamodel 62 try: 63 if ( 64 self.datamodel.fine_tune_model_id != response.fine_tuned_model 65 or self.datamodel.base_model_id != response.model 66 ): 67 self.datamodel.fine_tune_model_id = response.fine_tuned_model 68 self.datamodel.base_model_id = response.model 69 self.datamodel.save_to_file() 70 except Exception: 71 # Don't let this error crash the status call 72 pass 73 74 except openai.APIConnectionError: 75 return FineTuneStatus( 76 status=FineTuneStatusType.unknown, message="Server connection error" 77 ) 78 except openai.RateLimitError: 79 return FineTuneStatus( 80 status=FineTuneStatusType.unknown, 81 message="Rate limit exceeded. Could not fetch fine-tune status.", 82 ) 83 except openai.APIStatusError as e: 84 if e.status_code == 404: 85 return FineTuneStatus( 86 status=FineTuneStatusType.unknown, 87 message="Job with this ID not found. It may have been deleted.", 88 ) 89 return FineTuneStatus( 90 status=FineTuneStatusType.unknown, 91 message=f"Unknown error: [{e!s}]", 92 ) 93 94 if not response or not isinstance(response, FineTuningJob): 95 return FineTuneStatus( 96 status=FineTuneStatusType.unknown, 97 message="Invalid response from OpenAI", 98 ) 99 if response.error and response.error.code: 100 return FineTuneStatus( 101 status=FineTuneStatusType.failed, 102 message=f"{response.error.message} [Code: {response.error.code}]", 103 ) 104 status = response.status 105 if status == "failed": 106 return FineTuneStatus( 107 status=FineTuneStatusType.failed, 108 message="Job failed - unknown reason", 109 ) 110 if status == "cancelled": 111 return FineTuneStatus( 112 status=FineTuneStatusType.failed, message="Job cancelled" 113 ) 114 if status in ["validating_files", "running", "queued"]: 115 time_to_finish_msg: str | None = None 116 if response.estimated_finish is not None: 117 time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds." 118 return FineTuneStatus( 119 status=FineTuneStatusType.running, 120 message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}", 121 ) 122 if status == "succeeded": 123 return FineTuneStatus( 124 status=FineTuneStatusType.completed, message="Training job completed" 125 ) 126 return FineTuneStatus( 127 status=FineTuneStatusType.unknown, 128 message=f"Unknown status: [{status}]", 129 ) 130 131 async def _start(self, dataset: DatasetSplit) -> None: 132 task = self.datamodel.parent_task() 133 if not task: 134 raise ValueError("Task is required to start a fine-tune") 135 136 # Use chat format for unstructured output, and JSON for formatted output (was previously function calls) 137 format = DatasetFormat.OPENAI_CHAT_JSONL 138 if task.output_json_schema: 139 format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL 140 if self.datamodel.run_config is not None: 141 self.datamodel.run_config.structured_output_mode = ( 142 StructuredOutputMode.json_schema 143 ) 144 train_file_id = await self.generate_and_upload_jsonl( 145 dataset, self.datamodel.train_split_name, task, format 146 ) 147 validation_file_id = None 148 if self.datamodel.validation_split_name: 149 validation_file_id = await self.generate_and_upload_jsonl( 150 dataset, self.datamodel.validation_split_name, task, format 151 ) 152 153 # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API) 154 hyperparameters = { 155 k: v 156 for k, v in self.datamodel.parameters.items() 157 if k in ["n_epochs", "learning_rate_multiplier", "batch_size"] 158 } 159 160 oai_client = _get_openai_client() 161 ft = await oai_client.fine_tuning.jobs.create( 162 training_file=train_file_id, 163 model=self.datamodel.base_model_id, 164 validation_file=validation_file_id, 165 seed=self.datamodel.parameters.get("seed"), # type: ignore 166 hyperparameters=hyperparameters, # type: ignore 167 suffix=f"kiln_ai.{self.datamodel.id}", 168 ) 169 self.datamodel.provider_id = ft.id 170 self.datamodel.fine_tune_model_id = ft.fine_tuned_model 171 # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel 172 self.datamodel.base_model_id = ft.model 173 174 return None 175 176 async def generate_and_upload_jsonl( 177 self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat 178 ) -> str: 179 formatter = DatasetFormatter( 180 dataset, self.datamodel.system_message, self.datamodel.thinking_instructions 181 ) 182 path = await formatter.dump_to_file( 183 split_name, format, self.datamodel.data_strategy 184 ) 185 186 oai_client = _get_openai_client() 187 response = await oai_client.files.create( 188 file=open(path, "rb"), 189 purpose="fine-tune", 190 ) 191 id = response.id 192 if not id: 193 raise ValueError("Failed to upload file to OpenAI") 194 return id 195 196 @classmethod 197 def available_parameters(cls) -> list[FineTuneParameter]: 198 return [ 199 FineTuneParameter( 200 name="batch_size", 201 type="int", 202 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 203 ), 204 FineTuneParameter( 205 name="learning_rate_multiplier", 206 type="float", 207 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 208 optional=True, 209 ), 210 FineTuneParameter( 211 name="n_epochs", 212 type="int", 213 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 214 optional=True, 215 ), 216 FineTuneParameter( 217 name="seed", 218 type="int", 219 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 220 optional=True, 221 ), 222 ]
A fine-tuning adapter for OpenAI.
34 async def status(self) -> FineTuneStatus: 35 """ 36 Get the status of the fine-tune. 37 """ 38 39 # Update the datamodel with the latest status if it has changed 40 status = await self._status() 41 if status.status != self.datamodel.latest_status: 42 self.datamodel.latest_status = status.status 43 if self.datamodel.path: 44 self.datamodel.save_to_file() 45 return status
Get the status of the fine-tune.
async def
generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task, format: kiln_ai.adapters.fine_tune.dataset_formatter.DatasetFormat) -> str:
176 async def generate_and_upload_jsonl( 177 self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat 178 ) -> str: 179 formatter = DatasetFormatter( 180 dataset, self.datamodel.system_message, self.datamodel.thinking_instructions 181 ) 182 path = await formatter.dump_to_file( 183 split_name, format, self.datamodel.data_strategy 184 ) 185 186 oai_client = _get_openai_client() 187 response = await oai_client.files.create( 188 file=open(path, "rb"), 189 purpose="fine-tune", 190 ) 191 id = response.id 192 if not id: 193 raise ValueError("Failed to upload file to OpenAI") 194 return id
@classmethod
def
available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
196 @classmethod 197 def available_parameters(cls) -> list[FineTuneParameter]: 198 return [ 199 FineTuneParameter( 200 name="batch_size", 201 type="int", 202 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 203 ), 204 FineTuneParameter( 205 name="learning_rate_multiplier", 206 type="float", 207 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 208 optional=True, 209 ), 210 FineTuneParameter( 211 name="n_epochs", 212 type="int", 213 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 214 optional=True, 215 ), 216 FineTuneParameter( 217 name="seed", 218 type="int", 219 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 220 optional=True, 221 ), 222 ]
Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.