kiln_ai.adapters.fine_tune.openai_finetune

  1import time
  2
  3import openai
  4from openai.types.fine_tuning import FineTuningJob
  5
  6from kiln_ai.adapters.fine_tune.base_finetune import (
  7    BaseFinetuneAdapter,
  8    FineTuneParameter,
  9    FineTuneStatus,
 10    FineTuneStatusType,
 11)
 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter
 13from kiln_ai.datamodel import DatasetSplit, StructuredOutputMode, Task
 14from kiln_ai.utils.config import Config
 15
 16
 17def _get_openai_client():
 18    key = Config.shared().open_ai_api_key
 19    if not key:
 20        raise RuntimeError(
 21            "OpenAI API key not set. You must connect OpenAI in settings."
 22        )
 23    return openai.AsyncOpenAI(
 24        api_key=key,
 25    )
 26
 27
 28class OpenAIFinetune(BaseFinetuneAdapter):
 29    """
 30    A fine-tuning adapter for OpenAI.
 31    """
 32
 33    async def status(self) -> FineTuneStatus:
 34        """
 35        Get the status of the fine-tune.
 36        """
 37
 38        # Update the datamodel with the latest status if it has changed
 39        status = await self._status()
 40        if status.status != self.datamodel.latest_status:
 41            self.datamodel.latest_status = status.status
 42            if self.datamodel.path:
 43                self.datamodel.save_to_file()
 44        return status
 45
 46    async def _status(self) -> FineTuneStatus:
 47        if not self.datamodel or not self.datamodel.provider_id:
 48            return FineTuneStatus(
 49                status=FineTuneStatusType.pending,
 50                message="This fine-tune has not been started or has not been assigned a provider ID.",
 51            )
 52
 53        try:
 54            # Will raise an error if the job is not found, or for other issues
 55            oai_client = _get_openai_client()
 56            response = await oai_client.fine_tuning.jobs.retrieve(
 57                self.datamodel.provider_id
 58            )
 59
 60            # If the fine-tuned model has been updated, update the datamodel
 61            try:
 62                if (
 63                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 64                    or self.datamodel.base_model_id != response.model
 65                ):
 66                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 67                    self.datamodel.base_model_id = response.model
 68                    self.datamodel.save_to_file()
 69            except Exception:
 70                # Don't let this error crash the status call
 71                pass
 72
 73        except openai.APIConnectionError:
 74            return FineTuneStatus(
 75                status=FineTuneStatusType.unknown, message="Server connection error"
 76            )
 77        except openai.RateLimitError:
 78            return FineTuneStatus(
 79                status=FineTuneStatusType.unknown,
 80                message="Rate limit exceeded. Could not fetch fine-tune status.",
 81            )
 82        except openai.APIStatusError as e:
 83            if e.status_code == 404:
 84                return FineTuneStatus(
 85                    status=FineTuneStatusType.unknown,
 86                    message="Job with this ID not found. It may have been deleted.",
 87                )
 88            return FineTuneStatus(
 89                status=FineTuneStatusType.unknown,
 90                message=f"Unknown error: [{e!s}]",
 91            )
 92
 93        if not response or not isinstance(response, FineTuningJob):
 94            return FineTuneStatus(
 95                status=FineTuneStatusType.unknown,
 96                message="Invalid response from OpenAI",
 97            )
 98        if response.error and response.error.code:
 99            return FineTuneStatus(
100                status=FineTuneStatusType.failed,
101                message=f"{response.error.message} [Code: {response.error.code}]",
102            )
103        status = response.status
104        if status == "failed":
105            return FineTuneStatus(
106                status=FineTuneStatusType.failed,
107                message="Job failed - unknown reason",
108            )
109        if status == "cancelled":
110            return FineTuneStatus(
111                status=FineTuneStatusType.failed, message="Job cancelled"
112            )
113        if status in ["validating_files", "running", "queued"]:
114            time_to_finish_msg: str | None = None
115            if response.estimated_finish is not None:
116                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
117            return FineTuneStatus(
118                status=FineTuneStatusType.running,
119                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
120            )
121        if status == "succeeded":
122            return FineTuneStatus(
123                status=FineTuneStatusType.completed, message="Training job completed"
124            )
125        return FineTuneStatus(
126            status=FineTuneStatusType.unknown,
127            message=f"Unknown status: [{status}]",
128        )
129
130    async def _start(self, dataset: DatasetSplit) -> None:
131        task = self.datamodel.parent_task()
132        if not task:
133            raise ValueError("Task is required to start a fine-tune")
134
135        # Use chat format for unstructured output, and JSON for formatted output (was previously function calls)
136        format = DatasetFormat.OPENAI_CHAT_JSONL
137        if task.output_json_schema:
138            format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL
139            if self.datamodel.run_config is not None:
140                self.datamodel.run_config.structured_output_mode = (
141                    StructuredOutputMode.json_schema
142                )
143        train_file_id = await self.generate_and_upload_jsonl(
144            dataset, self.datamodel.train_split_name, task, format
145        )
146        validation_file_id = None
147        if self.datamodel.validation_split_name:
148            validation_file_id = await self.generate_and_upload_jsonl(
149                dataset, self.datamodel.validation_split_name, task, format
150            )
151
152        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
153        hyperparameters = {
154            k: v
155            for k, v in self.datamodel.parameters.items()
156            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
157        }
158
159        oai_client = _get_openai_client()
160        ft = await oai_client.fine_tuning.jobs.create(
161            training_file=train_file_id,
162            model=self.datamodel.base_model_id,
163            validation_file=validation_file_id,
164            seed=self.datamodel.parameters.get("seed"),  # type: ignore
165            hyperparameters=hyperparameters,  # type: ignore
166            suffix=f"kiln_ai.{self.datamodel.id}",
167        )
168        self.datamodel.provider_id = ft.id
169        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
170        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
171        self.datamodel.base_model_id = ft.model
172
173        return None
174
175    async def generate_and_upload_jsonl(
176        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
177    ) -> str:
178        formatter = DatasetFormatter(
179            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
180        )
181        path = await formatter.dump_to_file(
182            split_name, format, self.datamodel.data_strategy
183        )
184
185        oai_client = _get_openai_client()
186        response = await oai_client.files.create(
187            file=open(path, "rb"),
188            purpose="fine-tune",
189        )
190        id = response.id
191        if not id:
192            raise ValueError("Failed to upload file to OpenAI")
193        return id
194
195    @classmethod
196    def available_parameters(cls) -> list[FineTuneParameter]:
197        return [
198            FineTuneParameter(
199                name="batch_size",
200                type="int",
201                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
202            ),
203            FineTuneParameter(
204                name="learning_rate_multiplier",
205                type="float",
206                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
207                optional=True,
208            ),
209            FineTuneParameter(
210                name="n_epochs",
211                type="int",
212                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
213                optional=True,
214            ),
215            FineTuneParameter(
216                name="seed",
217                type="int",
218                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
219                optional=True,
220            ),
221        ]
 29class OpenAIFinetune(BaseFinetuneAdapter):
 30    """
 31    A fine-tuning adapter for OpenAI.
 32    """
 33
 34    async def status(self) -> FineTuneStatus:
 35        """
 36        Get the status of the fine-tune.
 37        """
 38
 39        # Update the datamodel with the latest status if it has changed
 40        status = await self._status()
 41        if status.status != self.datamodel.latest_status:
 42            self.datamodel.latest_status = status.status
 43            if self.datamodel.path:
 44                self.datamodel.save_to_file()
 45        return status
 46
 47    async def _status(self) -> FineTuneStatus:
 48        if not self.datamodel or not self.datamodel.provider_id:
 49            return FineTuneStatus(
 50                status=FineTuneStatusType.pending,
 51                message="This fine-tune has not been started or has not been assigned a provider ID.",
 52            )
 53
 54        try:
 55            # Will raise an error if the job is not found, or for other issues
 56            oai_client = _get_openai_client()
 57            response = await oai_client.fine_tuning.jobs.retrieve(
 58                self.datamodel.provider_id
 59            )
 60
 61            # If the fine-tuned model has been updated, update the datamodel
 62            try:
 63                if (
 64                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 65                    or self.datamodel.base_model_id != response.model
 66                ):
 67                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 68                    self.datamodel.base_model_id = response.model
 69                    self.datamodel.save_to_file()
 70            except Exception:
 71                # Don't let this error crash the status call
 72                pass
 73
 74        except openai.APIConnectionError:
 75            return FineTuneStatus(
 76                status=FineTuneStatusType.unknown, message="Server connection error"
 77            )
 78        except openai.RateLimitError:
 79            return FineTuneStatus(
 80                status=FineTuneStatusType.unknown,
 81                message="Rate limit exceeded. Could not fetch fine-tune status.",
 82            )
 83        except openai.APIStatusError as e:
 84            if e.status_code == 404:
 85                return FineTuneStatus(
 86                    status=FineTuneStatusType.unknown,
 87                    message="Job with this ID not found. It may have been deleted.",
 88                )
 89            return FineTuneStatus(
 90                status=FineTuneStatusType.unknown,
 91                message=f"Unknown error: [{e!s}]",
 92            )
 93
 94        if not response or not isinstance(response, FineTuningJob):
 95            return FineTuneStatus(
 96                status=FineTuneStatusType.unknown,
 97                message="Invalid response from OpenAI",
 98            )
 99        if response.error and response.error.code:
100            return FineTuneStatus(
101                status=FineTuneStatusType.failed,
102                message=f"{response.error.message} [Code: {response.error.code}]",
103            )
104        status = response.status
105        if status == "failed":
106            return FineTuneStatus(
107                status=FineTuneStatusType.failed,
108                message="Job failed - unknown reason",
109            )
110        if status == "cancelled":
111            return FineTuneStatus(
112                status=FineTuneStatusType.failed, message="Job cancelled"
113            )
114        if status in ["validating_files", "running", "queued"]:
115            time_to_finish_msg: str | None = None
116            if response.estimated_finish is not None:
117                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
118            return FineTuneStatus(
119                status=FineTuneStatusType.running,
120                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
121            )
122        if status == "succeeded":
123            return FineTuneStatus(
124                status=FineTuneStatusType.completed, message="Training job completed"
125            )
126        return FineTuneStatus(
127            status=FineTuneStatusType.unknown,
128            message=f"Unknown status: [{status}]",
129        )
130
131    async def _start(self, dataset: DatasetSplit) -> None:
132        task = self.datamodel.parent_task()
133        if not task:
134            raise ValueError("Task is required to start a fine-tune")
135
136        # Use chat format for unstructured output, and JSON for formatted output (was previously function calls)
137        format = DatasetFormat.OPENAI_CHAT_JSONL
138        if task.output_json_schema:
139            format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL
140            if self.datamodel.run_config is not None:
141                self.datamodel.run_config.structured_output_mode = (
142                    StructuredOutputMode.json_schema
143                )
144        train_file_id = await self.generate_and_upload_jsonl(
145            dataset, self.datamodel.train_split_name, task, format
146        )
147        validation_file_id = None
148        if self.datamodel.validation_split_name:
149            validation_file_id = await self.generate_and_upload_jsonl(
150                dataset, self.datamodel.validation_split_name, task, format
151            )
152
153        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
154        hyperparameters = {
155            k: v
156            for k, v in self.datamodel.parameters.items()
157            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
158        }
159
160        oai_client = _get_openai_client()
161        ft = await oai_client.fine_tuning.jobs.create(
162            training_file=train_file_id,
163            model=self.datamodel.base_model_id,
164            validation_file=validation_file_id,
165            seed=self.datamodel.parameters.get("seed"),  # type: ignore
166            hyperparameters=hyperparameters,  # type: ignore
167            suffix=f"kiln_ai.{self.datamodel.id}",
168        )
169        self.datamodel.provider_id = ft.id
170        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
171        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
172        self.datamodel.base_model_id = ft.model
173
174        return None
175
176    async def generate_and_upload_jsonl(
177        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
178    ) -> str:
179        formatter = DatasetFormatter(
180            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
181        )
182        path = await formatter.dump_to_file(
183            split_name, format, self.datamodel.data_strategy
184        )
185
186        oai_client = _get_openai_client()
187        response = await oai_client.files.create(
188            file=open(path, "rb"),
189            purpose="fine-tune",
190        )
191        id = response.id
192        if not id:
193            raise ValueError("Failed to upload file to OpenAI")
194        return id
195
196    @classmethod
197    def available_parameters(cls) -> list[FineTuneParameter]:
198        return [
199            FineTuneParameter(
200                name="batch_size",
201                type="int",
202                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
203            ),
204            FineTuneParameter(
205                name="learning_rate_multiplier",
206                type="float",
207                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
208                optional=True,
209            ),
210            FineTuneParameter(
211                name="n_epochs",
212                type="int",
213                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
214                optional=True,
215            ),
216            FineTuneParameter(
217                name="seed",
218                type="int",
219                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
220                optional=True,
221            ),
222        ]

A fine-tuning adapter for OpenAI.

async def status(self) -> kiln_ai.adapters.fine_tune.base_finetune.FineTuneStatus:
34    async def status(self) -> FineTuneStatus:
35        """
36        Get the status of the fine-tune.
37        """
38
39        # Update the datamodel with the latest status if it has changed
40        status = await self._status()
41        if status.status != self.datamodel.latest_status:
42            self.datamodel.latest_status = status.status
43            if self.datamodel.path:
44                self.datamodel.save_to_file()
45        return status

Get the status of the fine-tune.

async def generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task, format: kiln_ai.adapters.fine_tune.dataset_formatter.DatasetFormat) -> str:
176    async def generate_and_upload_jsonl(
177        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
178    ) -> str:
179        formatter = DatasetFormatter(
180            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
181        )
182        path = await formatter.dump_to_file(
183            split_name, format, self.datamodel.data_strategy
184        )
185
186        oai_client = _get_openai_client()
187        response = await oai_client.files.create(
188            file=open(path, "rb"),
189            purpose="fine-tune",
190        )
191        id = response.id
192        if not id:
193            raise ValueError("Failed to upload file to OpenAI")
194        return id
@classmethod
def available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
196    @classmethod
197    def available_parameters(cls) -> list[FineTuneParameter]:
198        return [
199            FineTuneParameter(
200                name="batch_size",
201                type="int",
202                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
203            ),
204            FineTuneParameter(
205                name="learning_rate_multiplier",
206                type="float",
207                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
208                optional=True,
209            ),
210            FineTuneParameter(
211                name="n_epochs",
212                type="int",
213                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
214                optional=True,
215            ),
216            FineTuneParameter(
217                name="seed",
218                type="int",
219                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
220                optional=True,
221            ),
222        ]

Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.