kiln_ai.adapters.fine_tune.openai_finetune
1import time 2 3import openai 4from openai.types.fine_tuning import FineTuningJob 5 6from kiln_ai.adapters.fine_tune.base_finetune import ( 7 BaseFinetuneAdapter, 8 FineTuneParameter, 9 FineTuneStatus, 10 FineTuneStatusType, 11) 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter 13from kiln_ai.datamodel import DatasetSplit, Task 14from kiln_ai.utils.config import Config 15 16oai_client = openai.AsyncOpenAI( 17 api_key=Config.shared().open_ai_api_key or "", 18) 19 20 21class OpenAIFinetune(BaseFinetuneAdapter): 22 """ 23 A fine-tuning adapter for OpenAI. 24 """ 25 26 async def status(self) -> FineTuneStatus: 27 """ 28 Get the status of the fine-tune. 29 """ 30 31 # Update the datamodel with the latest status if it has changed 32 status = await self._status() 33 if status.status != self.datamodel.latest_status: 34 self.datamodel.latest_status = status.status 35 if self.datamodel.path: 36 self.datamodel.save_to_file() 37 return status 38 39 async def _status(self) -> FineTuneStatus: 40 if not self.datamodel or not self.datamodel.provider_id: 41 return FineTuneStatus( 42 status=FineTuneStatusType.pending, 43 message="This fine-tune has not been started or has not been assigned a provider ID.", 44 ) 45 46 try: 47 # Will raise an error if the job is not found, or for other issues 48 response = await oai_client.fine_tuning.jobs.retrieve( 49 self.datamodel.provider_id 50 ) 51 52 # If the fine-tuned model has been updated, update the datamodel 53 try: 54 if ( 55 self.datamodel.fine_tune_model_id != response.fine_tuned_model 56 or self.datamodel.base_model_id != response.model 57 ): 58 self.datamodel.fine_tune_model_id = response.fine_tuned_model 59 self.datamodel.base_model_id = response.model 60 self.datamodel.save_to_file() 61 except Exception: 62 # Don't let this error crash the status call 63 pass 64 65 except openai.APIConnectionError: 66 return FineTuneStatus( 67 status=FineTuneStatusType.unknown, message="Server connection error" 68 ) 69 except openai.RateLimitError: 70 return FineTuneStatus( 71 status=FineTuneStatusType.unknown, 72 message="Rate limit exceeded. Could not fetch fine-tune status.", 73 ) 74 except openai.APIStatusError as e: 75 if e.status_code == 404: 76 return FineTuneStatus( 77 status=FineTuneStatusType.unknown, 78 message="Job with this ID not found. It may have been deleted.", 79 ) 80 return FineTuneStatus( 81 status=FineTuneStatusType.unknown, 82 message=f"Unknown error: [{str(e)}]", 83 ) 84 85 if not response or not isinstance(response, FineTuningJob): 86 return FineTuneStatus( 87 status=FineTuneStatusType.unknown, 88 message="Invalid response from OpenAI", 89 ) 90 if response.error and response.error.code: 91 return FineTuneStatus( 92 status=FineTuneStatusType.failed, 93 message=f"{response.error.message} [Code: {response.error.code}]", 94 ) 95 status = response.status 96 if status == "failed": 97 return FineTuneStatus( 98 status=FineTuneStatusType.failed, 99 message="Job failed - unknown reason", 100 ) 101 if status == "cancelled": 102 return FineTuneStatus( 103 status=FineTuneStatusType.failed, message="Job cancelled" 104 ) 105 if status in ["validating_files", "running", "queued"]: 106 time_to_finish_msg: str | None = None 107 if response.estimated_finish is not None: 108 time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds." 109 return FineTuneStatus( 110 status=FineTuneStatusType.running, 111 message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}", 112 ) 113 if status == "succeeded": 114 return FineTuneStatus( 115 status=FineTuneStatusType.completed, message="Training job completed" 116 ) 117 return FineTuneStatus( 118 status=FineTuneStatusType.unknown, 119 message=f"Unknown status: [{status}]", 120 ) 121 122 async def _start(self, dataset: DatasetSplit) -> None: 123 task = self.datamodel.parent_task() 124 if not task: 125 raise ValueError("Task is required to start a fine-tune") 126 127 train_file_id = await self.generate_and_upload_jsonl( 128 dataset, self.datamodel.train_split_name, task 129 ) 130 validation_file_id = None 131 if self.datamodel.validation_split_name: 132 validation_file_id = await self.generate_and_upload_jsonl( 133 dataset, self.datamodel.validation_split_name, task 134 ) 135 136 # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API) 137 hyperparameters = { 138 k: v 139 for k, v in self.datamodel.parameters.items() 140 if k in ["n_epochs", "learning_rate_multiplier", "batch_size"] 141 } 142 143 ft = await oai_client.fine_tuning.jobs.create( 144 training_file=train_file_id, 145 model=self.datamodel.base_model_id, 146 validation_file=validation_file_id, 147 seed=self.datamodel.parameters.get("seed"), # type: ignore 148 hyperparameters=hyperparameters, # type: ignore 149 suffix=f"kiln_ai.{self.datamodel.id}", 150 ) 151 self.datamodel.provider_id = ft.id 152 self.datamodel.fine_tune_model_id = ft.fine_tuned_model 153 # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel 154 self.datamodel.base_model_id = ft.model 155 156 return None 157 158 async def generate_and_upload_jsonl( 159 self, dataset: DatasetSplit, split_name: str, task: Task 160 ) -> str: 161 formatter = DatasetFormatter(dataset, self.datamodel.system_message) 162 # All OpenAI models support tool calls for structured outputs 163 format = ( 164 DatasetFormat.OPENAI_CHAT_TOOLCALL_JSONL 165 if task.output_json_schema 166 else DatasetFormat.OPENAI_CHAT_JSONL 167 ) 168 path = formatter.dump_to_file(split_name, format) 169 170 response = await oai_client.files.create( 171 file=open(path, "rb"), 172 purpose="fine-tune", 173 ) 174 id = response.id 175 if not id: 176 raise ValueError("Failed to upload file to OpenAI") 177 return id 178 179 @classmethod 180 def available_parameters(cls) -> list[FineTuneParameter]: 181 return [ 182 FineTuneParameter( 183 name="batch_size", 184 type="int", 185 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 186 ), 187 FineTuneParameter( 188 name="learning_rate_multiplier", 189 type="float", 190 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 191 optional=True, 192 ), 193 FineTuneParameter( 194 name="n_epochs", 195 type="int", 196 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 197 optional=True, 198 ), 199 FineTuneParameter( 200 name="seed", 201 type="int", 202 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 203 optional=True, 204 ), 205 ]
oai_client =
<openai.AsyncOpenAI object>
22class OpenAIFinetune(BaseFinetuneAdapter): 23 """ 24 A fine-tuning adapter for OpenAI. 25 """ 26 27 async def status(self) -> FineTuneStatus: 28 """ 29 Get the status of the fine-tune. 30 """ 31 32 # Update the datamodel with the latest status if it has changed 33 status = await self._status() 34 if status.status != self.datamodel.latest_status: 35 self.datamodel.latest_status = status.status 36 if self.datamodel.path: 37 self.datamodel.save_to_file() 38 return status 39 40 async def _status(self) -> FineTuneStatus: 41 if not self.datamodel or not self.datamodel.provider_id: 42 return FineTuneStatus( 43 status=FineTuneStatusType.pending, 44 message="This fine-tune has not been started or has not been assigned a provider ID.", 45 ) 46 47 try: 48 # Will raise an error if the job is not found, or for other issues 49 response = await oai_client.fine_tuning.jobs.retrieve( 50 self.datamodel.provider_id 51 ) 52 53 # If the fine-tuned model has been updated, update the datamodel 54 try: 55 if ( 56 self.datamodel.fine_tune_model_id != response.fine_tuned_model 57 or self.datamodel.base_model_id != response.model 58 ): 59 self.datamodel.fine_tune_model_id = response.fine_tuned_model 60 self.datamodel.base_model_id = response.model 61 self.datamodel.save_to_file() 62 except Exception: 63 # Don't let this error crash the status call 64 pass 65 66 except openai.APIConnectionError: 67 return FineTuneStatus( 68 status=FineTuneStatusType.unknown, message="Server connection error" 69 ) 70 except openai.RateLimitError: 71 return FineTuneStatus( 72 status=FineTuneStatusType.unknown, 73 message="Rate limit exceeded. Could not fetch fine-tune status.", 74 ) 75 except openai.APIStatusError as e: 76 if e.status_code == 404: 77 return FineTuneStatus( 78 status=FineTuneStatusType.unknown, 79 message="Job with this ID not found. It may have been deleted.", 80 ) 81 return FineTuneStatus( 82 status=FineTuneStatusType.unknown, 83 message=f"Unknown error: [{str(e)}]", 84 ) 85 86 if not response or not isinstance(response, FineTuningJob): 87 return FineTuneStatus( 88 status=FineTuneStatusType.unknown, 89 message="Invalid response from OpenAI", 90 ) 91 if response.error and response.error.code: 92 return FineTuneStatus( 93 status=FineTuneStatusType.failed, 94 message=f"{response.error.message} [Code: {response.error.code}]", 95 ) 96 status = response.status 97 if status == "failed": 98 return FineTuneStatus( 99 status=FineTuneStatusType.failed, 100 message="Job failed - unknown reason", 101 ) 102 if status == "cancelled": 103 return FineTuneStatus( 104 status=FineTuneStatusType.failed, message="Job cancelled" 105 ) 106 if status in ["validating_files", "running", "queued"]: 107 time_to_finish_msg: str | None = None 108 if response.estimated_finish is not None: 109 time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds." 110 return FineTuneStatus( 111 status=FineTuneStatusType.running, 112 message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}", 113 ) 114 if status == "succeeded": 115 return FineTuneStatus( 116 status=FineTuneStatusType.completed, message="Training job completed" 117 ) 118 return FineTuneStatus( 119 status=FineTuneStatusType.unknown, 120 message=f"Unknown status: [{status}]", 121 ) 122 123 async def _start(self, dataset: DatasetSplit) -> None: 124 task = self.datamodel.parent_task() 125 if not task: 126 raise ValueError("Task is required to start a fine-tune") 127 128 train_file_id = await self.generate_and_upload_jsonl( 129 dataset, self.datamodel.train_split_name, task 130 ) 131 validation_file_id = None 132 if self.datamodel.validation_split_name: 133 validation_file_id = await self.generate_and_upload_jsonl( 134 dataset, self.datamodel.validation_split_name, task 135 ) 136 137 # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API) 138 hyperparameters = { 139 k: v 140 for k, v in self.datamodel.parameters.items() 141 if k in ["n_epochs", "learning_rate_multiplier", "batch_size"] 142 } 143 144 ft = await oai_client.fine_tuning.jobs.create( 145 training_file=train_file_id, 146 model=self.datamodel.base_model_id, 147 validation_file=validation_file_id, 148 seed=self.datamodel.parameters.get("seed"), # type: ignore 149 hyperparameters=hyperparameters, # type: ignore 150 suffix=f"kiln_ai.{self.datamodel.id}", 151 ) 152 self.datamodel.provider_id = ft.id 153 self.datamodel.fine_tune_model_id = ft.fine_tuned_model 154 # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel 155 self.datamodel.base_model_id = ft.model 156 157 return None 158 159 async def generate_and_upload_jsonl( 160 self, dataset: DatasetSplit, split_name: str, task: Task 161 ) -> str: 162 formatter = DatasetFormatter(dataset, self.datamodel.system_message) 163 # All OpenAI models support tool calls for structured outputs 164 format = ( 165 DatasetFormat.OPENAI_CHAT_TOOLCALL_JSONL 166 if task.output_json_schema 167 else DatasetFormat.OPENAI_CHAT_JSONL 168 ) 169 path = formatter.dump_to_file(split_name, format) 170 171 response = await oai_client.files.create( 172 file=open(path, "rb"), 173 purpose="fine-tune", 174 ) 175 id = response.id 176 if not id: 177 raise ValueError("Failed to upload file to OpenAI") 178 return id 179 180 @classmethod 181 def available_parameters(cls) -> list[FineTuneParameter]: 182 return [ 183 FineTuneParameter( 184 name="batch_size", 185 type="int", 186 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 187 ), 188 FineTuneParameter( 189 name="learning_rate_multiplier", 190 type="float", 191 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 192 optional=True, 193 ), 194 FineTuneParameter( 195 name="n_epochs", 196 type="int", 197 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 198 optional=True, 199 ), 200 FineTuneParameter( 201 name="seed", 202 type="int", 203 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 204 optional=True, 205 ), 206 ]
A fine-tuning adapter for OpenAI.
27 async def status(self) -> FineTuneStatus: 28 """ 29 Get the status of the fine-tune. 30 """ 31 32 # Update the datamodel with the latest status if it has changed 33 status = await self._status() 34 if status.status != self.datamodel.latest_status: 35 self.datamodel.latest_status = status.status 36 if self.datamodel.path: 37 self.datamodel.save_to_file() 38 return status
Get the status of the fine-tune.
async def
generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task) -> str:
159 async def generate_and_upload_jsonl( 160 self, dataset: DatasetSplit, split_name: str, task: Task 161 ) -> str: 162 formatter = DatasetFormatter(dataset, self.datamodel.system_message) 163 # All OpenAI models support tool calls for structured outputs 164 format = ( 165 DatasetFormat.OPENAI_CHAT_TOOLCALL_JSONL 166 if task.output_json_schema 167 else DatasetFormat.OPENAI_CHAT_JSONL 168 ) 169 path = formatter.dump_to_file(split_name, format) 170 171 response = await oai_client.files.create( 172 file=open(path, "rb"), 173 purpose="fine-tune", 174 ) 175 id = response.id 176 if not id: 177 raise ValueError("Failed to upload file to OpenAI") 178 return id
@classmethod
def
available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
180 @classmethod 181 def available_parameters(cls) -> list[FineTuneParameter]: 182 return [ 183 FineTuneParameter( 184 name="batch_size", 185 type="int", 186 description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'", 187 ), 188 FineTuneParameter( 189 name="learning_rate_multiplier", 190 type="float", 191 description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'", 192 optional=True, 193 ), 194 FineTuneParameter( 195 name="n_epochs", 196 type="int", 197 description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'", 198 optional=True, 199 ), 200 FineTuneParameter( 201 name="seed", 202 type="int", 203 description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.", 204 optional=True, 205 ), 206 ]
Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.