kiln_ai.adapters.fine_tune.openai_finetune

  1import time
  2
  3import openai
  4from openai.types.fine_tuning import FineTuningJob
  5
  6from kiln_ai.adapters.fine_tune.base_finetune import (
  7    BaseFinetuneAdapter,
  8    FineTuneParameter,
  9    FineTuneStatus,
 10    FineTuneStatusType,
 11)
 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter
 13from kiln_ai.datamodel import DatasetSplit, StructuredOutputMode, Task
 14from kiln_ai.utils.config import Config
 15
 16
 17def _get_openai_client():
 18    key = Config.shared().open_ai_api_key
 19    if not key:
 20        raise RuntimeError(
 21            "OpenAI API key not set. You must connect OpenAI in settings."
 22        )
 23    return openai.AsyncOpenAI(
 24        api_key=key,
 25    )
 26
 27
 28class OpenAIFinetune(BaseFinetuneAdapter):
 29    """
 30    A fine-tuning adapter for OpenAI.
 31    """
 32
 33    async def status(self) -> FineTuneStatus:
 34        """
 35        Get the status of the fine-tune.
 36        """
 37
 38        # Update the datamodel with the latest status if it has changed
 39        status = await self._status()
 40        if status.status != self.datamodel.latest_status:
 41            self.datamodel.latest_status = status.status
 42            if self.datamodel.path:
 43                self.datamodel.save_to_file()
 44        return status
 45
 46    async def _status(self) -> FineTuneStatus:
 47        if not self.datamodel or not self.datamodel.provider_id:
 48            return FineTuneStatus(
 49                status=FineTuneStatusType.pending,
 50                message="This fine-tune has not been started or has not been assigned a provider ID.",
 51            )
 52
 53        try:
 54            # Will raise an error if the job is not found, or for other issues
 55            oai_client = _get_openai_client()
 56            response = await oai_client.fine_tuning.jobs.retrieve(
 57                self.datamodel.provider_id
 58            )
 59
 60            # If the fine-tuned model has been updated, update the datamodel
 61            try:
 62                if (
 63                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 64                    or self.datamodel.base_model_id != response.model
 65                ):
 66                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 67                    self.datamodel.base_model_id = response.model
 68                    self.datamodel.save_to_file()
 69            except Exception:
 70                # Don't let this error crash the status call
 71                pass
 72
 73        except openai.APIConnectionError:
 74            return FineTuneStatus(
 75                status=FineTuneStatusType.unknown, message="Server connection error"
 76            )
 77        except openai.RateLimitError:
 78            return FineTuneStatus(
 79                status=FineTuneStatusType.unknown,
 80                message="Rate limit exceeded. Could not fetch fine-tune status.",
 81            )
 82        except openai.APIStatusError as e:
 83            if e.status_code == 404:
 84                return FineTuneStatus(
 85                    status=FineTuneStatusType.unknown,
 86                    message="Job with this ID not found. It may have been deleted.",
 87                )
 88            return FineTuneStatus(
 89                status=FineTuneStatusType.unknown,
 90                message=f"Unknown error: [{e!s}]",
 91            )
 92
 93        if not response or not isinstance(response, FineTuningJob):
 94            return FineTuneStatus(
 95                status=FineTuneStatusType.unknown,
 96                message="Invalid response from OpenAI",
 97            )
 98        if response.error and response.error.code:
 99            return FineTuneStatus(
100                status=FineTuneStatusType.failed,
101                message=f"{response.error.message} [Code: {response.error.code}]",
102            )
103        status = response.status
104        if status == "failed":
105            return FineTuneStatus(
106                status=FineTuneStatusType.failed,
107                message="Job failed - unknown reason",
108            )
109        if status == "cancelled":
110            return FineTuneStatus(
111                status=FineTuneStatusType.failed, message="Job cancelled"
112            )
113        if status in ["validating_files", "running", "queued"]:
114            time_to_finish_msg: str | None = None
115            if response.estimated_finish is not None:
116                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
117            return FineTuneStatus(
118                status=FineTuneStatusType.running,
119                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
120            )
121        if status == "succeeded":
122            return FineTuneStatus(
123                status=FineTuneStatusType.completed, message="Training job completed"
124            )
125        return FineTuneStatus(
126            status=FineTuneStatusType.unknown,
127            message=f"Unknown status: [{status}]",
128        )
129
130    async def _start(self, dataset: DatasetSplit) -> None:
131        task = self.datamodel.parent_task()
132        if not task:
133            raise ValueError("Task is required to start a fine-tune")
134
135        # Use chat format for unstructured output, and JSON for formatted output (was previously function calls)
136        format = DatasetFormat.OPENAI_CHAT_JSONL
137        if task.output_json_schema:
138            format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL
139            self.datamodel.structured_output_mode = StructuredOutputMode.json_schema
140        train_file_id = await self.generate_and_upload_jsonl(
141            dataset, self.datamodel.train_split_name, task, format
142        )
143        validation_file_id = None
144        if self.datamodel.validation_split_name:
145            validation_file_id = await self.generate_and_upload_jsonl(
146                dataset, self.datamodel.validation_split_name, task, format
147            )
148
149        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
150        hyperparameters = {
151            k: v
152            for k, v in self.datamodel.parameters.items()
153            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
154        }
155
156        oai_client = _get_openai_client()
157        ft = await oai_client.fine_tuning.jobs.create(
158            training_file=train_file_id,
159            model=self.datamodel.base_model_id,
160            validation_file=validation_file_id,
161            seed=self.datamodel.parameters.get("seed"),  # type: ignore
162            hyperparameters=hyperparameters,  # type: ignore
163            suffix=f"kiln_ai.{self.datamodel.id}",
164        )
165        self.datamodel.provider_id = ft.id
166        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
167        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
168        self.datamodel.base_model_id = ft.model
169
170        return None
171
172    async def generate_and_upload_jsonl(
173        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
174    ) -> str:
175        formatter = DatasetFormatter(
176            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
177        )
178        path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy)
179
180        oai_client = _get_openai_client()
181        response = await oai_client.files.create(
182            file=open(path, "rb"),
183            purpose="fine-tune",
184        )
185        id = response.id
186        if not id:
187            raise ValueError("Failed to upload file to OpenAI")
188        return id
189
190    @classmethod
191    def available_parameters(cls) -> list[FineTuneParameter]:
192        return [
193            FineTuneParameter(
194                name="batch_size",
195                type="int",
196                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
197            ),
198            FineTuneParameter(
199                name="learning_rate_multiplier",
200                type="float",
201                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
202                optional=True,
203            ),
204            FineTuneParameter(
205                name="n_epochs",
206                type="int",
207                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
208                optional=True,
209            ),
210            FineTuneParameter(
211                name="seed",
212                type="int",
213                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
214                optional=True,
215            ),
216        ]
 29class OpenAIFinetune(BaseFinetuneAdapter):
 30    """
 31    A fine-tuning adapter for OpenAI.
 32    """
 33
 34    async def status(self) -> FineTuneStatus:
 35        """
 36        Get the status of the fine-tune.
 37        """
 38
 39        # Update the datamodel with the latest status if it has changed
 40        status = await self._status()
 41        if status.status != self.datamodel.latest_status:
 42            self.datamodel.latest_status = status.status
 43            if self.datamodel.path:
 44                self.datamodel.save_to_file()
 45        return status
 46
 47    async def _status(self) -> FineTuneStatus:
 48        if not self.datamodel or not self.datamodel.provider_id:
 49            return FineTuneStatus(
 50                status=FineTuneStatusType.pending,
 51                message="This fine-tune has not been started or has not been assigned a provider ID.",
 52            )
 53
 54        try:
 55            # Will raise an error if the job is not found, or for other issues
 56            oai_client = _get_openai_client()
 57            response = await oai_client.fine_tuning.jobs.retrieve(
 58                self.datamodel.provider_id
 59            )
 60
 61            # If the fine-tuned model has been updated, update the datamodel
 62            try:
 63                if (
 64                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 65                    or self.datamodel.base_model_id != response.model
 66                ):
 67                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 68                    self.datamodel.base_model_id = response.model
 69                    self.datamodel.save_to_file()
 70            except Exception:
 71                # Don't let this error crash the status call
 72                pass
 73
 74        except openai.APIConnectionError:
 75            return FineTuneStatus(
 76                status=FineTuneStatusType.unknown, message="Server connection error"
 77            )
 78        except openai.RateLimitError:
 79            return FineTuneStatus(
 80                status=FineTuneStatusType.unknown,
 81                message="Rate limit exceeded. Could not fetch fine-tune status.",
 82            )
 83        except openai.APIStatusError as e:
 84            if e.status_code == 404:
 85                return FineTuneStatus(
 86                    status=FineTuneStatusType.unknown,
 87                    message="Job with this ID not found. It may have been deleted.",
 88                )
 89            return FineTuneStatus(
 90                status=FineTuneStatusType.unknown,
 91                message=f"Unknown error: [{e!s}]",
 92            )
 93
 94        if not response or not isinstance(response, FineTuningJob):
 95            return FineTuneStatus(
 96                status=FineTuneStatusType.unknown,
 97                message="Invalid response from OpenAI",
 98            )
 99        if response.error and response.error.code:
100            return FineTuneStatus(
101                status=FineTuneStatusType.failed,
102                message=f"{response.error.message} [Code: {response.error.code}]",
103            )
104        status = response.status
105        if status == "failed":
106            return FineTuneStatus(
107                status=FineTuneStatusType.failed,
108                message="Job failed - unknown reason",
109            )
110        if status == "cancelled":
111            return FineTuneStatus(
112                status=FineTuneStatusType.failed, message="Job cancelled"
113            )
114        if status in ["validating_files", "running", "queued"]:
115            time_to_finish_msg: str | None = None
116            if response.estimated_finish is not None:
117                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
118            return FineTuneStatus(
119                status=FineTuneStatusType.running,
120                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
121            )
122        if status == "succeeded":
123            return FineTuneStatus(
124                status=FineTuneStatusType.completed, message="Training job completed"
125            )
126        return FineTuneStatus(
127            status=FineTuneStatusType.unknown,
128            message=f"Unknown status: [{status}]",
129        )
130
131    async def _start(self, dataset: DatasetSplit) -> None:
132        task = self.datamodel.parent_task()
133        if not task:
134            raise ValueError("Task is required to start a fine-tune")
135
136        # Use chat format for unstructured output, and JSON for formatted output (was previously function calls)
137        format = DatasetFormat.OPENAI_CHAT_JSONL
138        if task.output_json_schema:
139            format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL
140            self.datamodel.structured_output_mode = StructuredOutputMode.json_schema
141        train_file_id = await self.generate_and_upload_jsonl(
142            dataset, self.datamodel.train_split_name, task, format
143        )
144        validation_file_id = None
145        if self.datamodel.validation_split_name:
146            validation_file_id = await self.generate_and_upload_jsonl(
147                dataset, self.datamodel.validation_split_name, task, format
148            )
149
150        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
151        hyperparameters = {
152            k: v
153            for k, v in self.datamodel.parameters.items()
154            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
155        }
156
157        oai_client = _get_openai_client()
158        ft = await oai_client.fine_tuning.jobs.create(
159            training_file=train_file_id,
160            model=self.datamodel.base_model_id,
161            validation_file=validation_file_id,
162            seed=self.datamodel.parameters.get("seed"),  # type: ignore
163            hyperparameters=hyperparameters,  # type: ignore
164            suffix=f"kiln_ai.{self.datamodel.id}",
165        )
166        self.datamodel.provider_id = ft.id
167        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
168        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
169        self.datamodel.base_model_id = ft.model
170
171        return None
172
173    async def generate_and_upload_jsonl(
174        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
175    ) -> str:
176        formatter = DatasetFormatter(
177            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
178        )
179        path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy)
180
181        oai_client = _get_openai_client()
182        response = await oai_client.files.create(
183            file=open(path, "rb"),
184            purpose="fine-tune",
185        )
186        id = response.id
187        if not id:
188            raise ValueError("Failed to upload file to OpenAI")
189        return id
190
191    @classmethod
192    def available_parameters(cls) -> list[FineTuneParameter]:
193        return [
194            FineTuneParameter(
195                name="batch_size",
196                type="int",
197                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
198            ),
199            FineTuneParameter(
200                name="learning_rate_multiplier",
201                type="float",
202                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
203                optional=True,
204            ),
205            FineTuneParameter(
206                name="n_epochs",
207                type="int",
208                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
209                optional=True,
210            ),
211            FineTuneParameter(
212                name="seed",
213                type="int",
214                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
215                optional=True,
216            ),
217        ]

A fine-tuning adapter for OpenAI.

async def status(self) -> kiln_ai.adapters.fine_tune.base_finetune.FineTuneStatus:
34    async def status(self) -> FineTuneStatus:
35        """
36        Get the status of the fine-tune.
37        """
38
39        # Update the datamodel with the latest status if it has changed
40        status = await self._status()
41        if status.status != self.datamodel.latest_status:
42            self.datamodel.latest_status = status.status
43            if self.datamodel.path:
44                self.datamodel.save_to_file()
45        return status

Get the status of the fine-tune.

async def generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task, format: kiln_ai.adapters.fine_tune.dataset_formatter.DatasetFormat) -> str:
173    async def generate_and_upload_jsonl(
174        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
175    ) -> str:
176        formatter = DatasetFormatter(
177            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
178        )
179        path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy)
180
181        oai_client = _get_openai_client()
182        response = await oai_client.files.create(
183            file=open(path, "rb"),
184            purpose="fine-tune",
185        )
186        id = response.id
187        if not id:
188            raise ValueError("Failed to upload file to OpenAI")
189        return id
@classmethod
def available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
191    @classmethod
192    def available_parameters(cls) -> list[FineTuneParameter]:
193        return [
194            FineTuneParameter(
195                name="batch_size",
196                type="int",
197                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
198            ),
199            FineTuneParameter(
200                name="learning_rate_multiplier",
201                type="float",
202                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
203                optional=True,
204            ),
205            FineTuneParameter(
206                name="n_epochs",
207                type="int",
208                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
209                optional=True,
210            ),
211            FineTuneParameter(
212                name="seed",
213                type="int",
214                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
215                optional=True,
216            ),
217        ]

Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.