kiln_ai.adapters.fine_tune.openai_finetune

  1import time
  2
  3import openai
  4from openai.types.fine_tuning import FineTuningJob
  5
  6from kiln_ai.adapters.fine_tune.base_finetune import (
  7    BaseFinetuneAdapter,
  8    FineTuneParameter,
  9    FineTuneStatus,
 10    FineTuneStatusType,
 11)
 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter
 13from kiln_ai.datamodel import DatasetSplit, StructuredOutputMode, Task
 14from kiln_ai.utils.config import Config
 15
 16oai_client = openai.AsyncOpenAI(
 17    api_key=Config.shared().open_ai_api_key or "",
 18)
 19
 20
 21class OpenAIFinetune(BaseFinetuneAdapter):
 22    """
 23    A fine-tuning adapter for OpenAI.
 24    """
 25
 26    async def status(self) -> FineTuneStatus:
 27        """
 28        Get the status of the fine-tune.
 29        """
 30
 31        # Update the datamodel with the latest status if it has changed
 32        status = await self._status()
 33        if status.status != self.datamodel.latest_status:
 34            self.datamodel.latest_status = status.status
 35            if self.datamodel.path:
 36                self.datamodel.save_to_file()
 37        return status
 38
 39    async def _status(self) -> FineTuneStatus:
 40        if not self.datamodel or not self.datamodel.provider_id:
 41            return FineTuneStatus(
 42                status=FineTuneStatusType.pending,
 43                message="This fine-tune has not been started or has not been assigned a provider ID.",
 44            )
 45
 46        try:
 47            # Will raise an error if the job is not found, or for other issues
 48            response = await oai_client.fine_tuning.jobs.retrieve(
 49                self.datamodel.provider_id
 50            )
 51
 52            # If the fine-tuned model has been updated, update the datamodel
 53            try:
 54                if (
 55                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 56                    or self.datamodel.base_model_id != response.model
 57                ):
 58                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 59                    self.datamodel.base_model_id = response.model
 60                    self.datamodel.save_to_file()
 61            except Exception:
 62                # Don't let this error crash the status call
 63                pass
 64
 65        except openai.APIConnectionError:
 66            return FineTuneStatus(
 67                status=FineTuneStatusType.unknown, message="Server connection error"
 68            )
 69        except openai.RateLimitError:
 70            return FineTuneStatus(
 71                status=FineTuneStatusType.unknown,
 72                message="Rate limit exceeded. Could not fetch fine-tune status.",
 73            )
 74        except openai.APIStatusError as e:
 75            if e.status_code == 404:
 76                return FineTuneStatus(
 77                    status=FineTuneStatusType.unknown,
 78                    message="Job with this ID not found. It may have been deleted.",
 79                )
 80            return FineTuneStatus(
 81                status=FineTuneStatusType.unknown,
 82                message=f"Unknown error: [{str(e)}]",
 83            )
 84
 85        if not response or not isinstance(response, FineTuningJob):
 86            return FineTuneStatus(
 87                status=FineTuneStatusType.unknown,
 88                message="Invalid response from OpenAI",
 89            )
 90        if response.error and response.error.code:
 91            return FineTuneStatus(
 92                status=FineTuneStatusType.failed,
 93                message=f"{response.error.message} [Code: {response.error.code}]",
 94            )
 95        status = response.status
 96        if status == "failed":
 97            return FineTuneStatus(
 98                status=FineTuneStatusType.failed,
 99                message="Job failed - unknown reason",
100            )
101        if status == "cancelled":
102            return FineTuneStatus(
103                status=FineTuneStatusType.failed, message="Job cancelled"
104            )
105        if status in ["validating_files", "running", "queued"]:
106            time_to_finish_msg: str | None = None
107            if response.estimated_finish is not None:
108                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
109            return FineTuneStatus(
110                status=FineTuneStatusType.running,
111                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
112            )
113        if status == "succeeded":
114            return FineTuneStatus(
115                status=FineTuneStatusType.completed, message="Training job completed"
116            )
117        return FineTuneStatus(
118            status=FineTuneStatusType.unknown,
119            message=f"Unknown status: [{status}]",
120        )
121
122    async def _start(self, dataset: DatasetSplit) -> None:
123        task = self.datamodel.parent_task()
124        if not task:
125            raise ValueError("Task is required to start a fine-tune")
126
127        # Use chat format for unstructured output, and JSON for formatted output (was previously function calls)
128        format = DatasetFormat.OPENAI_CHAT_JSONL
129        if task.output_json_schema:
130            format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL
131            self.datamodel.structured_output_mode = StructuredOutputMode.json_schema
132        train_file_id = await self.generate_and_upload_jsonl(
133            dataset, self.datamodel.train_split_name, task, format
134        )
135        validation_file_id = None
136        if self.datamodel.validation_split_name:
137            validation_file_id = await self.generate_and_upload_jsonl(
138                dataset, self.datamodel.validation_split_name, task, format
139            )
140
141        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
142        hyperparameters = {
143            k: v
144            for k, v in self.datamodel.parameters.items()
145            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
146        }
147
148        ft = await oai_client.fine_tuning.jobs.create(
149            training_file=train_file_id,
150            model=self.datamodel.base_model_id,
151            validation_file=validation_file_id,
152            seed=self.datamodel.parameters.get("seed"),  # type: ignore
153            hyperparameters=hyperparameters,  # type: ignore
154            suffix=f"kiln_ai.{self.datamodel.id}",
155        )
156        self.datamodel.provider_id = ft.id
157        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
158        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
159        self.datamodel.base_model_id = ft.model
160
161        return None
162
163    async def generate_and_upload_jsonl(
164        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
165    ) -> str:
166        formatter = DatasetFormatter(
167            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
168        )
169        path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy)
170
171        response = await oai_client.files.create(
172            file=open(path, "rb"),
173            purpose="fine-tune",
174        )
175        id = response.id
176        if not id:
177            raise ValueError("Failed to upload file to OpenAI")
178        return id
179
180    @classmethod
181    def available_parameters(cls) -> list[FineTuneParameter]:
182        return [
183            FineTuneParameter(
184                name="batch_size",
185                type="int",
186                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
187            ),
188            FineTuneParameter(
189                name="learning_rate_multiplier",
190                type="float",
191                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
192                optional=True,
193            ),
194            FineTuneParameter(
195                name="n_epochs",
196                type="int",
197                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
198                optional=True,
199            ),
200            FineTuneParameter(
201                name="seed",
202                type="int",
203                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
204                optional=True,
205            ),
206        ]
oai_client = <openai.AsyncOpenAI object>
 22class OpenAIFinetune(BaseFinetuneAdapter):
 23    """
 24    A fine-tuning adapter for OpenAI.
 25    """
 26
 27    async def status(self) -> FineTuneStatus:
 28        """
 29        Get the status of the fine-tune.
 30        """
 31
 32        # Update the datamodel with the latest status if it has changed
 33        status = await self._status()
 34        if status.status != self.datamodel.latest_status:
 35            self.datamodel.latest_status = status.status
 36            if self.datamodel.path:
 37                self.datamodel.save_to_file()
 38        return status
 39
 40    async def _status(self) -> FineTuneStatus:
 41        if not self.datamodel or not self.datamodel.provider_id:
 42            return FineTuneStatus(
 43                status=FineTuneStatusType.pending,
 44                message="This fine-tune has not been started or has not been assigned a provider ID.",
 45            )
 46
 47        try:
 48            # Will raise an error if the job is not found, or for other issues
 49            response = await oai_client.fine_tuning.jobs.retrieve(
 50                self.datamodel.provider_id
 51            )
 52
 53            # If the fine-tuned model has been updated, update the datamodel
 54            try:
 55                if (
 56                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 57                    or self.datamodel.base_model_id != response.model
 58                ):
 59                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 60                    self.datamodel.base_model_id = response.model
 61                    self.datamodel.save_to_file()
 62            except Exception:
 63                # Don't let this error crash the status call
 64                pass
 65
 66        except openai.APIConnectionError:
 67            return FineTuneStatus(
 68                status=FineTuneStatusType.unknown, message="Server connection error"
 69            )
 70        except openai.RateLimitError:
 71            return FineTuneStatus(
 72                status=FineTuneStatusType.unknown,
 73                message="Rate limit exceeded. Could not fetch fine-tune status.",
 74            )
 75        except openai.APIStatusError as e:
 76            if e.status_code == 404:
 77                return FineTuneStatus(
 78                    status=FineTuneStatusType.unknown,
 79                    message="Job with this ID not found. It may have been deleted.",
 80                )
 81            return FineTuneStatus(
 82                status=FineTuneStatusType.unknown,
 83                message=f"Unknown error: [{str(e)}]",
 84            )
 85
 86        if not response or not isinstance(response, FineTuningJob):
 87            return FineTuneStatus(
 88                status=FineTuneStatusType.unknown,
 89                message="Invalid response from OpenAI",
 90            )
 91        if response.error and response.error.code:
 92            return FineTuneStatus(
 93                status=FineTuneStatusType.failed,
 94                message=f"{response.error.message} [Code: {response.error.code}]",
 95            )
 96        status = response.status
 97        if status == "failed":
 98            return FineTuneStatus(
 99                status=FineTuneStatusType.failed,
100                message="Job failed - unknown reason",
101            )
102        if status == "cancelled":
103            return FineTuneStatus(
104                status=FineTuneStatusType.failed, message="Job cancelled"
105            )
106        if status in ["validating_files", "running", "queued"]:
107            time_to_finish_msg: str | None = None
108            if response.estimated_finish is not None:
109                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
110            return FineTuneStatus(
111                status=FineTuneStatusType.running,
112                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
113            )
114        if status == "succeeded":
115            return FineTuneStatus(
116                status=FineTuneStatusType.completed, message="Training job completed"
117            )
118        return FineTuneStatus(
119            status=FineTuneStatusType.unknown,
120            message=f"Unknown status: [{status}]",
121        )
122
123    async def _start(self, dataset: DatasetSplit) -> None:
124        task = self.datamodel.parent_task()
125        if not task:
126            raise ValueError("Task is required to start a fine-tune")
127
128        # Use chat format for unstructured output, and JSON for formatted output (was previously function calls)
129        format = DatasetFormat.OPENAI_CHAT_JSONL
130        if task.output_json_schema:
131            format = DatasetFormat.OPENAI_CHAT_JSON_SCHEMA_JSONL
132            self.datamodel.structured_output_mode = StructuredOutputMode.json_schema
133        train_file_id = await self.generate_and_upload_jsonl(
134            dataset, self.datamodel.train_split_name, task, format
135        )
136        validation_file_id = None
137        if self.datamodel.validation_split_name:
138            validation_file_id = await self.generate_and_upload_jsonl(
139                dataset, self.datamodel.validation_split_name, task, format
140            )
141
142        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
143        hyperparameters = {
144            k: v
145            for k, v in self.datamodel.parameters.items()
146            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
147        }
148
149        ft = await oai_client.fine_tuning.jobs.create(
150            training_file=train_file_id,
151            model=self.datamodel.base_model_id,
152            validation_file=validation_file_id,
153            seed=self.datamodel.parameters.get("seed"),  # type: ignore
154            hyperparameters=hyperparameters,  # type: ignore
155            suffix=f"kiln_ai.{self.datamodel.id}",
156        )
157        self.datamodel.provider_id = ft.id
158        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
159        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
160        self.datamodel.base_model_id = ft.model
161
162        return None
163
164    async def generate_and_upload_jsonl(
165        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
166    ) -> str:
167        formatter = DatasetFormatter(
168            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
169        )
170        path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy)
171
172        response = await oai_client.files.create(
173            file=open(path, "rb"),
174            purpose="fine-tune",
175        )
176        id = response.id
177        if not id:
178            raise ValueError("Failed to upload file to OpenAI")
179        return id
180
181    @classmethod
182    def available_parameters(cls) -> list[FineTuneParameter]:
183        return [
184            FineTuneParameter(
185                name="batch_size",
186                type="int",
187                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
188            ),
189            FineTuneParameter(
190                name="learning_rate_multiplier",
191                type="float",
192                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
193                optional=True,
194            ),
195            FineTuneParameter(
196                name="n_epochs",
197                type="int",
198                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
199                optional=True,
200            ),
201            FineTuneParameter(
202                name="seed",
203                type="int",
204                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
205                optional=True,
206            ),
207        ]

A fine-tuning adapter for OpenAI.

async def status(self) -> kiln_ai.adapters.fine_tune.base_finetune.FineTuneStatus:
27    async def status(self) -> FineTuneStatus:
28        """
29        Get the status of the fine-tune.
30        """
31
32        # Update the datamodel with the latest status if it has changed
33        status = await self._status()
34        if status.status != self.datamodel.latest_status:
35            self.datamodel.latest_status = status.status
36            if self.datamodel.path:
37                self.datamodel.save_to_file()
38        return status

Get the status of the fine-tune.

async def generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task, format: kiln_ai.adapters.fine_tune.dataset_formatter.DatasetFormat) -> str:
164    async def generate_and_upload_jsonl(
165        self, dataset: DatasetSplit, split_name: str, task: Task, format: DatasetFormat
166    ) -> str:
167        formatter = DatasetFormatter(
168            dataset, self.datamodel.system_message, self.datamodel.thinking_instructions
169        )
170        path = formatter.dump_to_file(split_name, format, self.datamodel.data_strategy)
171
172        response = await oai_client.files.create(
173            file=open(path, "rb"),
174            purpose="fine-tune",
175        )
176        id = response.id
177        if not id:
178            raise ValueError("Failed to upload file to OpenAI")
179        return id
@classmethod
def available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
181    @classmethod
182    def available_parameters(cls) -> list[FineTuneParameter]:
183        return [
184            FineTuneParameter(
185                name="batch_size",
186                type="int",
187                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
188            ),
189            FineTuneParameter(
190                name="learning_rate_multiplier",
191                type="float",
192                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
193                optional=True,
194            ),
195            FineTuneParameter(
196                name="n_epochs",
197                type="int",
198                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
199                optional=True,
200            ),
201            FineTuneParameter(
202                name="seed",
203                type="int",
204                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
205                optional=True,
206            ),
207        ]

Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.