kiln_ai.adapters.fine_tune.openai_finetune

  1import time
  2
  3import openai
  4from openai.types.fine_tuning import FineTuningJob
  5
  6from kiln_ai.adapters.fine_tune.base_finetune import (
  7    BaseFinetuneAdapter,
  8    FineTuneParameter,
  9    FineTuneStatus,
 10    FineTuneStatusType,
 11)
 12from kiln_ai.adapters.fine_tune.dataset_formatter import DatasetFormat, DatasetFormatter
 13from kiln_ai.datamodel import DatasetSplit, Task
 14from kiln_ai.utils.config import Config
 15
 16oai_client = openai.AsyncOpenAI(
 17    api_key=Config.shared().open_ai_api_key or "",
 18)
 19
 20
 21class OpenAIFinetune(BaseFinetuneAdapter):
 22    """
 23    A fine-tuning adapter for OpenAI.
 24    """
 25
 26    async def status(self) -> FineTuneStatus:
 27        """
 28        Get the status of the fine-tune.
 29        """
 30
 31        # Update the datamodel with the latest status if it has changed
 32        status = await self._status()
 33        if status.status != self.datamodel.latest_status:
 34            self.datamodel.latest_status = status.status
 35            if self.datamodel.path:
 36                self.datamodel.save_to_file()
 37        return status
 38
 39    async def _status(self) -> FineTuneStatus:
 40        if not self.datamodel or not self.datamodel.provider_id:
 41            return FineTuneStatus(
 42                status=FineTuneStatusType.pending,
 43                message="This fine-tune has not been started or has not been assigned a provider ID.",
 44            )
 45
 46        try:
 47            # Will raise an error if the job is not found, or for other issues
 48            response = await oai_client.fine_tuning.jobs.retrieve(
 49                self.datamodel.provider_id
 50            )
 51
 52            # If the fine-tuned model has been updated, update the datamodel
 53            try:
 54                if (
 55                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 56                    or self.datamodel.base_model_id != response.model
 57                ):
 58                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 59                    self.datamodel.base_model_id = response.model
 60                    self.datamodel.save_to_file()
 61            except Exception:
 62                # Don't let this error crash the status call
 63                pass
 64
 65        except openai.APIConnectionError:
 66            return FineTuneStatus(
 67                status=FineTuneStatusType.unknown, message="Server connection error"
 68            )
 69        except openai.RateLimitError:
 70            return FineTuneStatus(
 71                status=FineTuneStatusType.unknown,
 72                message="Rate limit exceeded. Could not fetch fine-tune status.",
 73            )
 74        except openai.APIStatusError as e:
 75            if e.status_code == 404:
 76                return FineTuneStatus(
 77                    status=FineTuneStatusType.unknown,
 78                    message="Job with this ID not found. It may have been deleted.",
 79                )
 80            return FineTuneStatus(
 81                status=FineTuneStatusType.unknown,
 82                message=f"Unknown error: [{str(e)}]",
 83            )
 84
 85        if not response or not isinstance(response, FineTuningJob):
 86            return FineTuneStatus(
 87                status=FineTuneStatusType.unknown,
 88                message="Invalid response from OpenAI",
 89            )
 90        if response.error and response.error.code:
 91            return FineTuneStatus(
 92                status=FineTuneStatusType.failed,
 93                message=f"{response.error.message} [Code: {response.error.code}]",
 94            )
 95        status = response.status
 96        if status == "failed":
 97            return FineTuneStatus(
 98                status=FineTuneStatusType.failed,
 99                message="Job failed - unknown reason",
100            )
101        if status == "cancelled":
102            return FineTuneStatus(
103                status=FineTuneStatusType.failed, message="Job cancelled"
104            )
105        if status in ["validating_files", "running", "queued"]:
106            time_to_finish_msg: str | None = None
107            if response.estimated_finish is not None:
108                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
109            return FineTuneStatus(
110                status=FineTuneStatusType.running,
111                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
112            )
113        if status == "succeeded":
114            return FineTuneStatus(
115                status=FineTuneStatusType.completed, message="Training job completed"
116            )
117        return FineTuneStatus(
118            status=FineTuneStatusType.unknown,
119            message=f"Unknown status: [{status}]",
120        )
121
122    async def _start(self, dataset: DatasetSplit) -> None:
123        task = self.datamodel.parent_task()
124        if not task:
125            raise ValueError("Task is required to start a fine-tune")
126
127        train_file_id = await self.generate_and_upload_jsonl(
128            dataset, self.datamodel.train_split_name, task
129        )
130        validation_file_id = None
131        if self.datamodel.validation_split_name:
132            validation_file_id = await self.generate_and_upload_jsonl(
133                dataset, self.datamodel.validation_split_name, task
134            )
135
136        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
137        hyperparameters = {
138            k: v
139            for k, v in self.datamodel.parameters.items()
140            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
141        }
142
143        ft = await oai_client.fine_tuning.jobs.create(
144            training_file=train_file_id,
145            model=self.datamodel.base_model_id,
146            validation_file=validation_file_id,
147            seed=self.datamodel.parameters.get("seed"),  # type: ignore
148            hyperparameters=hyperparameters,  # type: ignore
149            suffix=f"kiln_ai.{self.datamodel.id}",
150        )
151        self.datamodel.provider_id = ft.id
152        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
153        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
154        self.datamodel.base_model_id = ft.model
155
156        return None
157
158    async def generate_and_upload_jsonl(
159        self, dataset: DatasetSplit, split_name: str, task: Task
160    ) -> str:
161        formatter = DatasetFormatter(dataset, self.datamodel.system_message)
162        # All OpenAI models support tool calls for structured outputs
163        format = (
164            DatasetFormat.OPENAI_CHAT_TOOLCALL_JSONL
165            if task.output_json_schema
166            else DatasetFormat.OPENAI_CHAT_JSONL
167        )
168        path = formatter.dump_to_file(split_name, format)
169
170        response = await oai_client.files.create(
171            file=open(path, "rb"),
172            purpose="fine-tune",
173        )
174        id = response.id
175        if not id:
176            raise ValueError("Failed to upload file to OpenAI")
177        return id
178
179    @classmethod
180    def available_parameters(cls) -> list[FineTuneParameter]:
181        return [
182            FineTuneParameter(
183                name="batch_size",
184                type="int",
185                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
186            ),
187            FineTuneParameter(
188                name="learning_rate_multiplier",
189                type="float",
190                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
191                optional=True,
192            ),
193            FineTuneParameter(
194                name="n_epochs",
195                type="int",
196                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
197                optional=True,
198            ),
199            FineTuneParameter(
200                name="seed",
201                type="int",
202                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
203                optional=True,
204            ),
205        ]
oai_client = <openai.AsyncOpenAI object>
 22class OpenAIFinetune(BaseFinetuneAdapter):
 23    """
 24    A fine-tuning adapter for OpenAI.
 25    """
 26
 27    async def status(self) -> FineTuneStatus:
 28        """
 29        Get the status of the fine-tune.
 30        """
 31
 32        # Update the datamodel with the latest status if it has changed
 33        status = await self._status()
 34        if status.status != self.datamodel.latest_status:
 35            self.datamodel.latest_status = status.status
 36            if self.datamodel.path:
 37                self.datamodel.save_to_file()
 38        return status
 39
 40    async def _status(self) -> FineTuneStatus:
 41        if not self.datamodel or not self.datamodel.provider_id:
 42            return FineTuneStatus(
 43                status=FineTuneStatusType.pending,
 44                message="This fine-tune has not been started or has not been assigned a provider ID.",
 45            )
 46
 47        try:
 48            # Will raise an error if the job is not found, or for other issues
 49            response = await oai_client.fine_tuning.jobs.retrieve(
 50                self.datamodel.provider_id
 51            )
 52
 53            # If the fine-tuned model has been updated, update the datamodel
 54            try:
 55                if (
 56                    self.datamodel.fine_tune_model_id != response.fine_tuned_model
 57                    or self.datamodel.base_model_id != response.model
 58                ):
 59                    self.datamodel.fine_tune_model_id = response.fine_tuned_model
 60                    self.datamodel.base_model_id = response.model
 61                    self.datamodel.save_to_file()
 62            except Exception:
 63                # Don't let this error crash the status call
 64                pass
 65
 66        except openai.APIConnectionError:
 67            return FineTuneStatus(
 68                status=FineTuneStatusType.unknown, message="Server connection error"
 69            )
 70        except openai.RateLimitError:
 71            return FineTuneStatus(
 72                status=FineTuneStatusType.unknown,
 73                message="Rate limit exceeded. Could not fetch fine-tune status.",
 74            )
 75        except openai.APIStatusError as e:
 76            if e.status_code == 404:
 77                return FineTuneStatus(
 78                    status=FineTuneStatusType.unknown,
 79                    message="Job with this ID not found. It may have been deleted.",
 80                )
 81            return FineTuneStatus(
 82                status=FineTuneStatusType.unknown,
 83                message=f"Unknown error: [{str(e)}]",
 84            )
 85
 86        if not response or not isinstance(response, FineTuningJob):
 87            return FineTuneStatus(
 88                status=FineTuneStatusType.unknown,
 89                message="Invalid response from OpenAI",
 90            )
 91        if response.error and response.error.code:
 92            return FineTuneStatus(
 93                status=FineTuneStatusType.failed,
 94                message=f"{response.error.message} [Code: {response.error.code}]",
 95            )
 96        status = response.status
 97        if status == "failed":
 98            return FineTuneStatus(
 99                status=FineTuneStatusType.failed,
100                message="Job failed - unknown reason",
101            )
102        if status == "cancelled":
103            return FineTuneStatus(
104                status=FineTuneStatusType.failed, message="Job cancelled"
105            )
106        if status in ["validating_files", "running", "queued"]:
107            time_to_finish_msg: str | None = None
108            if response.estimated_finish is not None:
109                time_to_finish_msg = f"Estimated finish time: {int(response.estimated_finish - time.time())} seconds."
110            return FineTuneStatus(
111                status=FineTuneStatusType.running,
112                message=f"Fine tune job is running [{status}]. {time_to_finish_msg or ''}",
113            )
114        if status == "succeeded":
115            return FineTuneStatus(
116                status=FineTuneStatusType.completed, message="Training job completed"
117            )
118        return FineTuneStatus(
119            status=FineTuneStatusType.unknown,
120            message=f"Unknown status: [{status}]",
121        )
122
123    async def _start(self, dataset: DatasetSplit) -> None:
124        task = self.datamodel.parent_task()
125        if not task:
126            raise ValueError("Task is required to start a fine-tune")
127
128        train_file_id = await self.generate_and_upload_jsonl(
129            dataset, self.datamodel.train_split_name, task
130        )
131        validation_file_id = None
132        if self.datamodel.validation_split_name:
133            validation_file_id = await self.generate_and_upload_jsonl(
134                dataset, self.datamodel.validation_split_name, task
135            )
136
137        # Filter to hyperparameters which are set via the hyperparameters field (some like seed are set via the API)
138        hyperparameters = {
139            k: v
140            for k, v in self.datamodel.parameters.items()
141            if k in ["n_epochs", "learning_rate_multiplier", "batch_size"]
142        }
143
144        ft = await oai_client.fine_tuning.jobs.create(
145            training_file=train_file_id,
146            model=self.datamodel.base_model_id,
147            validation_file=validation_file_id,
148            seed=self.datamodel.parameters.get("seed"),  # type: ignore
149            hyperparameters=hyperparameters,  # type: ignore
150            suffix=f"kiln_ai.{self.datamodel.id}",
151        )
152        self.datamodel.provider_id = ft.id
153        self.datamodel.fine_tune_model_id = ft.fine_tuned_model
154        # Model can get more specific after fine-tune call (gpt-4o-mini to gpt-4o-mini-2024-07-18) so we update it in the datamodel
155        self.datamodel.base_model_id = ft.model
156
157        return None
158
159    async def generate_and_upload_jsonl(
160        self, dataset: DatasetSplit, split_name: str, task: Task
161    ) -> str:
162        formatter = DatasetFormatter(dataset, self.datamodel.system_message)
163        # All OpenAI models support tool calls for structured outputs
164        format = (
165            DatasetFormat.OPENAI_CHAT_TOOLCALL_JSONL
166            if task.output_json_schema
167            else DatasetFormat.OPENAI_CHAT_JSONL
168        )
169        path = formatter.dump_to_file(split_name, format)
170
171        response = await oai_client.files.create(
172            file=open(path, "rb"),
173            purpose="fine-tune",
174        )
175        id = response.id
176        if not id:
177            raise ValueError("Failed to upload file to OpenAI")
178        return id
179
180    @classmethod
181    def available_parameters(cls) -> list[FineTuneParameter]:
182        return [
183            FineTuneParameter(
184                name="batch_size",
185                type="int",
186                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
187            ),
188            FineTuneParameter(
189                name="learning_rate_multiplier",
190                type="float",
191                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
192                optional=True,
193            ),
194            FineTuneParameter(
195                name="n_epochs",
196                type="int",
197                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
198                optional=True,
199            ),
200            FineTuneParameter(
201                name="seed",
202                type="int",
203                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
204                optional=True,
205            ),
206        ]

A fine-tuning adapter for OpenAI.

async def status(self) -> kiln_ai.adapters.fine_tune.base_finetune.FineTuneStatus:
27    async def status(self) -> FineTuneStatus:
28        """
29        Get the status of the fine-tune.
30        """
31
32        # Update the datamodel with the latest status if it has changed
33        status = await self._status()
34        if status.status != self.datamodel.latest_status:
35            self.datamodel.latest_status = status.status
36            if self.datamodel.path:
37                self.datamodel.save_to_file()
38        return status

Get the status of the fine-tune.

async def generate_and_upload_jsonl( self, dataset: kiln_ai.datamodel.DatasetSplit, split_name: str, task: kiln_ai.datamodel.Task) -> str:
159    async def generate_and_upload_jsonl(
160        self, dataset: DatasetSplit, split_name: str, task: Task
161    ) -> str:
162        formatter = DatasetFormatter(dataset, self.datamodel.system_message)
163        # All OpenAI models support tool calls for structured outputs
164        format = (
165            DatasetFormat.OPENAI_CHAT_TOOLCALL_JSONL
166            if task.output_json_schema
167            else DatasetFormat.OPENAI_CHAT_JSONL
168        )
169        path = formatter.dump_to_file(split_name, format)
170
171        response = await oai_client.files.create(
172            file=open(path, "rb"),
173            purpose="fine-tune",
174        )
175        id = response.id
176        if not id:
177            raise ValueError("Failed to upload file to OpenAI")
178        return id
@classmethod
def available_parameters(cls) -> list[kiln_ai.adapters.fine_tune.base_finetune.FineTuneParameter]:
180    @classmethod
181    def available_parameters(cls) -> list[FineTuneParameter]:
182        return [
183            FineTuneParameter(
184                name="batch_size",
185                type="int",
186                description="Number of examples in each batch. A larger batch size means that model parameters are updated less frequently, but with lower variance. Defaults to 'auto'",
187            ),
188            FineTuneParameter(
189                name="learning_rate_multiplier",
190                type="float",
191                description="Scaling factor for the learning rate. A smaller learning rate may be useful to avoid overfitting. Defaults to 'auto'",
192                optional=True,
193            ),
194            FineTuneParameter(
195                name="n_epochs",
196                type="int",
197                description="The number of epochs to train the model for. An epoch refers to one full cycle through the training dataset. Defaults to 'auto'",
198                optional=True,
199            ),
200            FineTuneParameter(
201                name="seed",
202                type="int",
203                description="The seed controls the reproducibility of the job. Passing in the same seed and job parameters should produce the same results, but may differ in rare cases. If a seed is not specified, one will be generated for you.",
204                optional=True,
205            ),
206        ]

Returns a list of parameters that can be provided for this fine-tune. Includes hyperparameters, etc.