kiln_ai.adapters.fine_tune.openai_finetune
1import time 2 3import openai 4from openai.types.fine_tuning import FineTuningJob 5 6from kiln_ai.adapters.fine_tune.base_finetune import ( 7 BaseFinetuneAdapter, 8 FineTuneParameter, 9 FineTuneStatus, 10 FineTuneStatusType, 11) 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter 13from kiln_ai.datamodel import DatasetSplit, StructuredOutputMode, Task 14from kiln_ai.utils.config import Config 15 16 17def _get_openai_client(): 18 key = Config.shared().open_ai_api_key 19 if not key: 20 raise RuntimeError( 21 "OpenAI API key not set. You must connect OpenAI in settings." 22 ) 23 return openai.AsyncOpenAI( 24 api_key=key, 25 ) 26 27 28class OpenAIFinetune(BaseFinetuneAdapter): 29 """ 30 A fine-tuning adapter for OpenAI. 31 """ 32 33 async def status(self) -> FineTuneStatus: 34 """ 35 Get the status of the fine-tune. 36 """ 37 38 # Update the datamodel with the latest status if it has changed 39 status = await self._status() 40 if status.status != self.datamodel.latest_status: 41 self.datamodel.latest_status = status.status 42 if self.datamodel.path: 43 self.datamodel.save_to_file() 44 return status 45 46 async def _status(self) -> FineTuneStatus: 47 if not self.datamodel or not self.datamodel.provider_id: 48 return FineTuneStatus( 49 status=FineTuneStatusType.pending, 50 message="This fine-tune has not been started or has not been assigned a provider ID.", 51 ) 52 53 try: 54 # Will raise an error if the job is not found, or for other issues 55 oai_client = _get_openai_client() 56 response = await oai_client.fine_tuning.jobs.retrieve( 57 self.datamodel.provider_id 58 ) 59 60 # If the fine-tuned model has been updated, update the datamodel 61 try: 62 if ( 63 self.datamodel.fine_tune_model_id != response.fine_tuned_model 64 or self.datamodel.base_model_id != response.model 65 ): 66 self.datamodel.fine_tune_model_id = response.fine_tuned_model 67 self.datamodel.base_model_id = response.model 68 self.datamodel.save_to_file() 69 except Exception: 70 # Don't let this error crash the status call 71 pass 72 73 except openai.APIConnectionError: 74 return FineTuneStatus( 75 status=FineTuneStatusType.unknown, message="Server connection error" 76 ) 77 except openai.RateLimitError: 78 return FineTuneStatus( 79 status=FineTuneStatusType.unknown, 80 message="Rate limit exceeded. Could not fetch fine-tune status.", 81 ) 82 except openai.APIStatusError as e: 83 if e.status_code == 404: 84 return FineTuneStatus( 85 status=FineTuneStatusType.unknown, 86 message="Job with this ID not found. It may have been deleted.", 87 ) 88 return FineTuneStatus( 89 status=FineTuneStatusType.unknown, 90 message=f"Unknown error: [{e!s}]", 91 ) 92 93 if not response or not isinstance(response, FineTuningJob): 94 return FineTuneStatus( 95 status=FineTuneStatusType.unknown, 96 message="Invalid response from OpenAI", 97 ) 98 if response.error and response.error.code: 99 return FineTuneStatus( 100 status=FineTuneStatusType.failed, 101 message=f"{response.error.message} [Code: {response.error.code}]", 102 ) 103 status = response.status 104 if status == "failed": 105 return FineTuneStatus( 106 status=FineTuneStatusType.failed, 107 message="Job failed - unknown reason", 108 ) 109 if status == "cancelled": 110 return FineTuneStatus( 111 status=FineTuneStatusType.failed, message="Job cancelled" 112 ) 113 if status in ["validating_files", "running", "queued"]: 114 time_to_finish_msg: str | None = None 115 if response.estimated_finish is not None: 116 time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds." 117 return FineTuneStatus( 118 status=FineTuneStatusType.running, 119 message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}", 120 ) 121 if status == "succeeded": 122 return FineTuneStatus( 123 status=FineTuneStatusType.completed, message="Training job completed" 124 ) 125 return FineTuneStatus( 126 status=FineTuneStatusType.unknown, 127 message=f"Unknown status: [{status}]", 128 ) 129 130 async def _start(self, dataset: DatasetSplit) -> None: 131 task = self.datamodel.parent_task() 132 if not task: 133 raise ValueError("Task is required to start a fine-tune") 134 135 # Use chat format for unstructured output, and JSON for formatted output (was previously function calls) 136 format = DatasetFormat.OPENAI_CHAT_JSONL 137 if task.output_json_schema: 138 format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL 139 self.datamodel.structured_output_mode = StructuredOutputMode.json_schema 140 train_file_id = await self.generate_and_upload_jsonl( 141 dataset, self.datamodel.train_split_name, task, format 142 ) 143 validation_file_id = None 144 if self.datamodel.validation_split_name: 145 validation_file_id = await self.generate_and_upload_jsonl( 146 dataset, self.datamodel.validation_split_name, task, format 147 ) 148 149 # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API) 150 hyperparameters = { 151 k: v 152 for k, v in self.datamodel.parameters.items() 153 if k in ["n_epochs", "learning_rate_multiplier", "batch_size"] 154 } 155 156 oai_client = _get_openai_client() 157 ft = await oai_client.fine_tuning.jobs.create( 158 training_file=train_file_id, 159 model=self.datamodel.base_model_id, 160 validation_file=validation_file_id, 161 seed=self.datamodel.parameters.get("seed"), # type: ignore 162 hyperparameters=hyperparameters, # type: ignore 163 suffix=f"kiln_ai.{self.datamodel.id}", 164 ) 165 self.datamodel.provider_id = ft.id 166 self.datamodel.fine_tune_model_id = ft.fine_tuned_model 167 # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel 168 self.datamodel.base_model_id = ft.model 169 170 return None 171 172 async def generate_and_upload_jsonl( 173 self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat 174 ) -> str: 175 formatter = DatasetFormatter( 176 dataset, self.datamodel.system_message, self.datamodel.thinking_instructions 177 ) 178 path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy) 179 180 oai_client = _get_openai_client() 181 response = await oai_client.files.create( 182 file=open(path, "rb"), 183 purpose="fine-tune", 184 ) 185 id = response.id 186 if not id: 187 raise ValueError("Failed to upload file to OpenAI") 188 return id 189 190 @classmethod 191 def available_parameters(cls) -> list[FineTuneParameter]: 192 return [ 193 FineTuneParameter( 194 name="batch_size", 195 type="int", 196 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 197 ), 198 FineTuneParameter( 199 name="learning_rate_multiplier", 200 type="float", 201 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 202 optional=True, 203 ), 204 FineTuneParameter( 205 name="n_epochs", 206 type="int", 207 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 208 optional=True, 209 ), 210 FineTuneParameter( 211 name="seed", 212 type="int", 213 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 214 optional=True, 215 ), 216 ]
29class OpenAIFinetune(BaseFinetuneAdapter): 30 """ 31 A fine-tuning adapter for OpenAI. 32 """ 33 34 async def status(self) -> FineTuneStatus: 35 """ 36 Get the status of the fine-tune. 37 """ 38 39 # Update the datamodel with the latest status if it has changed 40 status = await self._status() 41 if status.status != self.datamodel.latest_status: 42 self.datamodel.latest_status = status.status 43 if self.datamodel.path: 44 self.datamodel.save_to_file() 45 return status 46 47 async def _status(self) -> FineTuneStatus: 48 if not self.datamodel or not self.datamodel.provider_id: 49 return FineTuneStatus( 50 status=FineTuneStatusType.pending, 51 message="This fine-tune has not been started or has not been assigned a provider ID.", 52 ) 53 54 try: 55 # Will raise an error if the job is not found, or for other issues 56 oai_client = _get_openai_client() 57 response = await oai_client.fine_tuning.jobs.retrieve( 58 self.datamodel.provider_id 59 ) 60 61 # If the fine-tuned model has been updated, update the datamodel 62 try: 63 if ( 64 self.datamodel.fine_tune_model_id != response.fine_tuned_model 65 or self.datamodel.base_model_id != response.model 66 ): 67 self.datamodel.fine_tune_model_id = response.fine_tuned_model 68 self.datamodel.base_model_id = response.model 69 self.datamodel.save_to_file() 70 except Exception: 71 # Don't let this error crash the status call 72 pass 73 74 except openai.APIConnectionError: 75 return FineTuneStatus( 76 status=FineTuneStatusType.unknown, message="Server connection error" 77 ) 78 except openai.RateLimitError: 79 return FineTuneStatus( 80 status=FineTuneStatusType.unknown, 81 message="Rate limit exceeded. Could not fetch fine-tune status.", 82 ) 83 except openai.APIStatusError as e: 84 if e.status_code == 404: 85 return FineTuneStatus( 86 status=FineTuneStatusType.unknown, 87 message="Job with this ID not found. It may have been deleted.", 88 ) 89 return FineTuneStatus( 90 status=FineTuneStatusType.unknown, 91 message=f"Unknown error: [{e!s}]", 92 ) 93 94 if not response or not isinstance(response, FineTuningJob): 95 return FineTuneStatus( 96 status=FineTuneStatusType.unknown, 97 message="Invalid response from OpenAI", 98 ) 99 if response.error and response.error.code: 100 return FineTuneStatus( 101 status=FineTuneStatusType.failed, 102 message=f"{response.error.message} [Code: {response.error.code}]", 103 ) 104 status = response.status 105 if status == "failed": 106 return FineTuneStatus( 107 status=FineTuneStatusType.failed, 108 message="Job failed - unknown reason", 109 ) 110 if status == "cancelled": 111 return FineTuneStatus( 112 status=FineTuneStatusType.failed, message="Job cancelled" 113 ) 114 if status in ["validating_files", "running", "queued"]: 115 time_to_finish_msg: str | None = None 116 if response.estimated_finish is not None: 117 time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds." 118 return FineTuneStatus( 119 status=FineTuneStatusType.running, 120 message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}", 121 ) 122 if status == "succeeded": 123 return FineTuneStatus( 124 status=FineTuneStatusType.completed, message="Training job completed" 125 ) 126 return FineTuneStatus( 127 status=FineTuneStatusType.unknown, 128 message=f"Unknown status: [{status}]", 129 ) 130 131 async def _start(self, dataset: DatasetSplit) -> None: 132 task = self.datamodel.parent_task() 133 if not task: 134 raise ValueError("Task is required to start a fine-tune") 135 136 # Use chat format for unstructured output, and JSON for formatted output (was previously function calls) 137 format = DatasetFormat.OPENAI_CHAT_JSONL 138 if task.output_json_schema: 139 format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL 140 self.datamodel.structured_output_mode = StructuredOutputMode.json_schema 141 train_file_id = await self.generate_and_upload_jsonl( 142 dataset, self.datamodel.train_split_name, task, format 143 ) 144 validation_file_id = None 145 if self.datamodel.validation_split_name: 146 validation_file_id = await self.generate_and_upload_jsonl( 147 dataset, self.datamodel.validation_split_name, task, format 148 ) 149 150 # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API) 151 hyperparameters = { 152 k: v 153 for k, v in self.datamodel.parameters.items() 154 if k in ["n_epochs", "learning_rate_multiplier", "batch_size"] 155 } 156 157 oai_client = _get_openai_client() 158 ft = await oai_client.fine_tuning.jobs.create( 159 training_file=train_file_id, 160 model=self.datamodel.base_model_id, 161 validation_file=validation_file_id, 162 seed=self.datamodel.parameters.get("seed"), # type: ignore 163 hyperparameters=hyperparameters, # type: ignore 164 suffix=f"kiln_ai.{self.datamodel.id}", 165 ) 166 self.datamodel.provider_id = ft.id 167 self.datamodel.fine_tune_model_id = ft.fine_tuned_model 168 # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel 169 self.datamodel.base_model_id = ft.model 170 171 return None 172 173 async def generate_and_upload_jsonl( 174 self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat 175 ) -> str: 176 formatter = DatasetFormatter( 177 dataset, self.datamodel.system_message, self.datamodel.thinking_instructions 178 ) 179 path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy) 180 181 oai_client = _get_openai_client() 182 response = await oai_client.files.create( 183 file=open(path, "rb"), 184 purpose="fine-tune", 185 ) 186 id = response.id 187 if not id: 188 raise ValueError("Failed to upload file to OpenAI") 189 return id 190 191 @classmethod 192 def available_parameters(cls) -> list[FineTuneParameter]: 193 return [ 194 FineTuneParameter( 195 name="batch_size", 196 type="int", 197 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 198 ), 199 FineTuneParameter( 200 name="learning_rate_multiplier", 201 type="float", 202 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 203 optional=True, 204 ), 205 FineTuneParameter( 206 name="n_epochs", 207 type="int", 208 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 209 optional=True, 210 ), 211 FineTuneParameter( 212 name="seed", 213 type="int", 214 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 215 optional=True, 216 ), 217 ]
A fine-tuning adapter for OpenAI.
34 async def status(self) -> FineTuneStatus: 35 """ 36 Get the status of the fine-tune. 37 """ 38 39 # Update the datamodel with the latest status if it has changed 40 status = await self._status() 41 if status.status != self.datamodel.latest_status: 42 self.datamodel.latest_status = status.status 43 if self.datamodel.path: 44 self.datamodel.save_to_file() 45 return status
Get the status of the fine-tune.
async def
generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task, format: kiln_ai.adapters.fine_tune.dataset_formatter.DatasetFormat) -> str:
173 async def generate_and_upload_jsonl( 174 self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat 175 ) -> str: 176 formatter = DatasetFormatter( 177 dataset, self.datamodel.system_message, self.datamodel.thinking_instructions 178 ) 179 path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy) 180 181 oai_client = _get_openai_client() 182 response = await oai_client.files.create( 183 file=open(path, "rb"), 184 purpose="fine-tune", 185 ) 186 id = response.id 187 if not id: 188 raise ValueError("Failed to upload file to OpenAI") 189 return id
@classmethod
def
available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
191 @classmethod 192 def available_parameters(cls) -> list[FineTuneParameter]: 193 return [ 194 FineTuneParameter( 195 name="batch_size", 196 type="int", 197 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 198 ), 199 FineTuneParameter( 200 name="learning_rate_multiplier", 201 type="float", 202 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 203 optional=True, 204 ), 205 FineTuneParameter( 206 name="n_epochs", 207 type="int", 208 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 209 optional=True, 210 ), 211 FineTuneParameter( 212 name="seed", 213 type="int", 214 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 215 optional=True, 216 ), 217 ]
Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.