diff --git a/guides/GettingStarted.md b/guides/GettingStarted.md index 247f3798..1dfd00f4 100644 --- a/guides/GettingStarted.md +++ b/guides/GettingStarted.md @@ -849,3 +849,57 @@ val runSteps = openAI.runSteps( runId = RunId("run_abc123") ) ``` + +### Event streaming + +Create a thread and run it in one request and process streaming events. + +```kotlin +openAI.createStreamingThreadRun( + request = ThreadRunRequest( + assistantId = AssistantId("asst_abc123"), + thread = ThreadRequest( + messages = listOf( + ThreadMessage( + role = Role.User, + content = "Explain deep learning to a 5 year old." + ) + ) + ), + ) + .onEach { assistantStreamEvent: AssistantStreamEvent -> println(assistantStreamEvent) } + .collect() +) +``` + +Get data object from AssistantStreamEvent. + +```kotlin +//Type of data for generic type can be found in AssistantStreamEventType +when(assistantStreamEvent.type) { + AssistantStreamEventType.THREAD_CREATED -> { + val thread = assistantStreamEvent.getData() + ... + } + AssistantStreamEventType.MESSAGE_CREATED -> { + val message = assistantStreamEvent.getData() + ... + } + AssistantStreamEventType.UNKNOWN -> { + //Data field is a string and can be used instead of calling getData + val data = assistantStreamEvent.data + //Handle unknown message type + } +} +``` + +If a new event type is released before the library is updated, you can create and deserialize your own type by providing a KSerializer. + +```kotlin +when(assistantStreamEvent.type) { + AssistantStreamEventType.UNKNOWN -> { + val data = assistantStreamEvent.getDate(myCustomSerializer) + ... + } +} +``` \ No newline at end of file diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/Runs.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/Runs.kt index 1d714e6b..a518124f 100644 --- a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/Runs.kt +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/Runs.kt @@ -6,6 +6,8 @@ import com.aallam.openai.api.core.SortOrder import com.aallam.openai.api.core.Status import com.aallam.openai.api.run.* import com.aallam.openai.api.thread.ThreadId +import io.ktor.sse.ServerSentEvent +import kotlinx.coroutines.flow.Flow /** * Represents an execution run on a thread. @@ -23,6 +25,21 @@ public interface Runs { @BetaOpenAI public suspend fun createRun(threadId: ThreadId, request: RunRequest, requestOptions: RequestOptions? = null): Run + /** + * Create a run with event streaming. + * + * @param threadId The ID of the thread to run + * @param request request for a run + * @param requestOptions request options. + * @param block a lambda function that will be called for each event. + */ + @BetaOpenAI + public suspend fun createStreamingRun( + threadId: ThreadId, + request: RunRequest, + requestOptions: RequestOptions? = null + ) : Flow + /** * Retrieves a run. * @@ -92,6 +109,25 @@ public interface Runs { requestOptions: RequestOptions? = null ): Run + /** + * When a run has the status: [Status.RequiresAction] and required action is [RequiredAction.SubmitToolOutputs], + * this endpoint can be used to submit the outputs from the tool calls once they're all completed. + * All outputs must be submitted in a single request using event streaming. + * + * @param threadId the ID of the thread to which this run belongs + * @param runId the ID of the run to submit tool outputs for + * @param output list of tool outputs to submit + * @param requestOptions request options. + * @param block a lambda function that will be called for each event. + */ + @BetaOpenAI + public suspend fun submitStreamingToolOutput( + threadId: ThreadId, + runId: RunId, + output: List, + requestOptions: RequestOptions? = null + ) : Flow + /** * Cancels a run that is [Status.InProgress]. * @@ -111,6 +147,19 @@ public interface Runs { @BetaOpenAI public suspend fun createThreadRun(request: ThreadRunRequest, requestOptions: RequestOptions? = null): Run + /** + * Create a thread and run it in one request with event streaming. + * + * @param request request for a thread run + * @param requestOptions request options. + * @param block a lambda function that will be called for each event. + */ + @BetaOpenAI + public suspend fun createStreamingThreadRun( + request: ThreadRunRequest, + requestOptions: RequestOptions? = null + ) : Flow + /** * Retrieves a run step. * diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/extension/AssistantStreamEvent.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/extension/AssistantStreamEvent.kt new file mode 100644 index 00000000..c66ba111 --- /dev/null +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/extension/AssistantStreamEvent.kt @@ -0,0 +1,30 @@ +package com.aallam.openai.client.extension + +import com.aallam.openai.api.run.AssistantStreamEvent +import com.aallam.openai.client.internal.JsonLenient +import kotlinx.serialization.KSerializer + +/** + * Get the data of the [AssistantStreamEvent] using the provided [serializer] from the corresponding event type. + * @param the type of the data. + * @throws IllegalStateException if the [AssistantStreamEvent] data is null. + * @throws ClassCastException if the [AssistantStreamEvent] data cannot be cast to the provided type. + */ +@Suppress("UNCHECKED_CAST") +public fun AssistantStreamEvent.getData(): T { + return type + .let { it.serializer as? KSerializer } + ?.let(::getData) + ?: throw IllegalStateException("Failed to decode ServerSentEvent: $rawType") +} + + +/** + * Get the data of the [AssistantStreamEvent] using the provided [serializer]. + * @throws IllegalStateException if the [AssistantStreamEvent] data is null. + * @throws ClassCastException if the [AssistantStreamEvent] data cannot be cast to the provided type. + */ +public fun AssistantStreamEvent.getData(serializer: KSerializer): T = + data + ?.let { JsonLenient.decodeFromString(serializer, it) } + ?: throw IllegalStateException("ServerSentEvent data was null: $rawType") diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/extension/ServerSentEvent.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/extension/ServerSentEvent.kt new file mode 100644 index 00000000..635c0d43 --- /dev/null +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/extension/ServerSentEvent.kt @@ -0,0 +1,19 @@ +package com.aallam.openai.client.extension + +import com.aallam.openai.api.run.AssistantStreamEvent +import com.aallam.openai.api.run.AssistantStreamEventType +import com.aallam.openai.client.internal.JsonLenient +import io.ktor.sse.ServerSentEvent +import kotlinx.serialization.KSerializer + +/** + * Convert a [ServerSentEvent] to [AssistantStreamEvent]. Type will be [AssistantStreamEventType.UNKNOWN] if the event is null or unrecognized. + */ +internal fun ServerSentEvent.toAssistantStreamEvent() : AssistantStreamEvent = + AssistantStreamEvent( + event, + event + ?.let(AssistantStreamEventType::fromEvent) + ?:AssistantStreamEventType.UNKNOWN, + data + ) diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/HttpClient.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/HttpClient.kt index 85280f74..135d3e64 100644 --- a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/HttpClient.kt +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/HttpClient.kt @@ -11,6 +11,7 @@ import io.ktor.client.plugins.auth.* import io.ktor.client.plugins.auth.providers.* import io.ktor.client.plugins.contentnegotiation.* import io.ktor.client.plugins.logging.* +import io.ktor.client.plugins.sse.SSE import io.ktor.http.* import io.ktor.serialization.kotlinx.* import io.ktor.util.* @@ -71,6 +72,8 @@ internal fun createHttpClient(config: OpenAIConfig): HttpClient { exponentialDelay(config.retry.base, config.retry.maxDelay.inWholeMilliseconds) } + install(SSE) + defaultRequest { url(config.host.baseUrl) config.host.queryParams.onEach { (key, value) -> url.parameters.appendIfNameAbsent(key, value) } diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/api/RunsApi.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/api/RunsApi.kt index 6b63177b..7dfa17ce 100644 --- a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/api/RunsApi.kt +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/api/RunsApi.kt @@ -1,5 +1,6 @@ package com.aallam.openai.client.internal.api +import com.aallam.openai.api.BetaOpenAI import com.aallam.openai.api.core.PaginatedList import com.aallam.openai.api.core.RequestOptions import com.aallam.openai.api.core.SortOrder @@ -13,13 +14,16 @@ import com.aallam.openai.client.internal.http.perform import io.ktor.client.call.* import io.ktor.client.request.* import io.ktor.http.* +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.onEach internal class RunsApi(val requester: HttpRequester) : Runs { override suspend fun createRun(threadId: ThreadId, request: RunRequest, requestOptions: RequestOptions?): Run { return requester.perform { it.post { url(path = "${ApiPath.Threads}/${threadId.id}/runs") - setBody(request) + setBody(request.copy(stream = false)) contentType(ContentType.Application.Json) beta("assistants", 2) requestOptions(requestOptions) @@ -27,6 +31,20 @@ internal class RunsApi(val requester: HttpRequester) : Runs { } } + @BetaOpenAI + override suspend fun createStreamingRun(threadId: ThreadId, request: RunRequest, requestOptions: RequestOptions?) : Flow { + return requester + .performSse { + url(path = "${ApiPath.Threads}/${threadId.id}/runs") + setBody(request.copy(stream = true)) + contentType(ContentType.Application.Json) + accept(ContentType.Text.EventStream) + beta("assistants", 2) + requestOptions(requestOptions) + method = HttpMethod.Post + } + } + override suspend fun getRun(threadId: ThreadId, runId: RunId, requestOptions: RequestOptions?): Run { return requester.perform { it.get { @@ -95,6 +113,25 @@ internal class RunsApi(val requester: HttpRequester) : Runs { } } + @BetaOpenAI + override suspend fun submitStreamingToolOutput( + threadId: ThreadId, + runId: RunId, + output: List, + requestOptions: RequestOptions? + ) : Flow { + return requester + .performSse { + url(path = "${ApiPath.Threads}/${threadId.id}/runs/${runId.id}/submit_tool_outputs") + setBody(mapOf("tool_outputs" to output, "stream" to true)) + contentType(ContentType.Application.Json) + accept(ContentType.Text.EventStream) + beta("assistants", 2) + requestOptions(requestOptions) + method = HttpMethod.Post + } + } + override suspend fun cancel(threadId: ThreadId, runId: RunId, requestOptions: RequestOptions?): Run { return requester.perform { it.post { @@ -109,7 +146,7 @@ internal class RunsApi(val requester: HttpRequester) : Runs { return requester.perform { it.post { url(path = "${ApiPath.Threads}/runs") - setBody(request) + setBody(request.copy(stream = false)) contentType(ContentType.Application.Json) beta("assistants", 2) requestOptions(requestOptions) @@ -117,6 +154,24 @@ internal class RunsApi(val requester: HttpRequester) : Runs { } } + @BetaOpenAI + override suspend fun createStreamingThreadRun( + request: ThreadRunRequest, + requestOptions: RequestOptions? + ) : Flow { + return requester + .performSse { + url(path = "${ApiPath.Threads}/runs") + setBody(request.copy(stream = true)) + contentType(ContentType.Application.Json) + accept(ContentType.Text.EventStream) + beta("assistants", 2) + requestOptions(requestOptions) + method = HttpMethod.Post + } + } + + override suspend fun runStep( threadId: ThreadId, runId: RunId, diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpRequester.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpRequester.kt index d29ecd3c..359f7cde 100644 --- a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpRequester.kt +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpRequester.kt @@ -1,9 +1,13 @@ package com.aallam.openai.client.internal.http +import com.aallam.openai.api.run.AssistantStreamEvent import io.ktor.client.* +import io.ktor.client.plugins.sse.ClientSSESession import io.ktor.client.request.* import io.ktor.client.statement.* +import io.ktor.sse.ServerSentEvent import io.ktor.util.reflect.* +import kotlinx.coroutines.flow.Flow /** * Http request performer. @@ -15,6 +19,14 @@ internal interface HttpRequester : AutoCloseable { */ suspend fun perform(info: TypeInfo, block: suspend (HttpClient) -> HttpResponse): T + /** + * Perform an HTTP request and process emitted server-side events. + * + */ + suspend fun performSse( + builderBlock: HttpRequestBuilder.() -> Unit + ): Flow + /** * Perform an HTTP request and get a result. * diff --git a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpTransport.kt b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpTransport.kt index c8c7bc03..445d2183 100644 --- a/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpTransport.kt +++ b/openai-client/src/commonMain/kotlin/com.aallam.openai.client/internal/http/HttpTransport.kt @@ -1,15 +1,26 @@ package com.aallam.openai.client.internal.http import com.aallam.openai.api.exception.* +import com.aallam.openai.api.run.AssistantStreamEvent +import com.aallam.openai.client.extension.toAssistantStreamEvent +import com.aallam.openai.client.internal.api.ApiPath import io.ktor.client.* import io.ktor.client.call.* import io.ktor.client.network.sockets.* import io.ktor.client.plugins.* +import io.ktor.client.plugins.sse.ClientSSESession +import io.ktor.client.plugins.sse.sseSession import io.ktor.client.request.* import io.ktor.client.statement.* +import io.ktor.http.ContentType +import io.ktor.sse.ServerSentEvent import io.ktor.util.reflect.* import io.ktor.utils.io.errors.* import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach /** HTTP transport layer */ internal class HttpTransport(private val httpClient: HttpClient) : HttpRequester { @@ -35,6 +46,19 @@ internal class HttpTransport(private val httpClient: HttpClient) : HttpRequester } } + override suspend fun performSse( + builderBlock: HttpRequestBuilder.() -> Unit + ): Flow { + try { + return httpClient + .sseSession(block = builderBlock) + .incoming + .map(ServerSentEvent::toAssistantStreamEvent) + } catch (e: Exception) { + throw handleException(e) + } + } + override fun close() { httpClient.close() } diff --git a/openai-client/src/commonTest/kotlin/com/aallam/openai/client/TestRuns.kt b/openai-client/src/commonTest/kotlin/com/aallam/openai/client/TestRuns.kt index 9f32318d..08b055c9 100644 --- a/openai-client/src/commonTest/kotlin/com/aallam/openai/client/TestRuns.kt +++ b/openai-client/src/commonTest/kotlin/com/aallam/openai/client/TestRuns.kt @@ -6,14 +6,23 @@ import com.aallam.openai.api.core.PaginatedList import com.aallam.openai.api.core.Role import com.aallam.openai.api.message.MessageRequest import com.aallam.openai.api.model.ModelId +import com.aallam.openai.api.run.AssistantStreamEvent +import com.aallam.openai.api.run.AssistantStreamEventType +import com.aallam.openai.api.run.Run import com.aallam.openai.api.run.RunRequest import com.aallam.openai.api.run.RunStep import com.aallam.openai.api.run.ThreadRunRequest import com.aallam.openai.api.thread.ThreadMessage import com.aallam.openai.api.thread.ThreadRequest +import com.aallam.openai.client.extension.getData import com.aallam.openai.client.internal.JsonLenient +import kotlinx.coroutines.flow.filter +import kotlinx.coroutines.flow.onEach +import kotlinx.coroutines.flow.singleOrNull import kotlin.test.Test import kotlin.test.assertEquals +import kotlin.test.assertNotNull +import kotlin.test.assertTrue class TestRuns : TestOpenAI() { @@ -47,6 +56,45 @@ class TestRuns : TestOpenAI() { assertEquals(1, runs.size) } + @Test + fun streamingRuns() = test { + val assistant = openAI.assistant( + request = assistantRequest { + name = "Math Tutor" + tools = listOf(AssistantTool.CodeInterpreter) + model = ModelId("gpt-4o") + } + ) + val thread = openAI.thread() + val request = RunRequest(assistantId = assistant.id) + openAI.message( + threadId = thread.id, + request = MessageRequest( + role = Role.User, + content = "solve me 1 + 1", + metadata = mapOf(), + ), + requestOptions = null, + ) + + val runCompletedEvent = openAI + .createStreamingRun(threadId = thread.id, request = request) + .filter { it.type == AssistantStreamEventType.THREAD_RUN_COMPLETED } + .singleOrNull() + + assertNotNull(runCompletedEvent) + + val run = runCompletedEvent.getData() + + assertEquals(thread.id, run.threadId) + + var retrieved = openAI.getRun(threadId = thread.id, runId = run.id) + assertEquals(run.id, retrieved.id) + + val runs = openAI.runs(threadId = thread.id) + assertEquals(1, runs.size) + } + @Test fun threadAndRuns() = test { val assistant = openAI.assistant( @@ -74,6 +122,42 @@ class TestRuns : TestOpenAI() { assertEquals(0, runs.size) } + @Test + fun streamingThreadAndRuns() = test { + val assistant = openAI.assistant( + request = assistantRequest { + name = "Math Tutor" + tools = listOf(AssistantTool.CodeInterpreter) + model = ModelId("gpt-4o") + } + ) + val request = ThreadRunRequest( + thread = ThreadRequest( + listOf( + ThreadMessage( + role = Role.User, + content = "solve 1 + 2", + ) + ) + ), + assistantId = assistant.id, + ) + + val runCompletedEvent = openAI + .createStreamingThreadRun(request = request) + .filter { it.type == AssistantStreamEventType.THREAD_RUN_COMPLETED } + .singleOrNull() + + assertNotNull(runCompletedEvent) + + val run = runCompletedEvent.getData() + + assertEquals(assistant.id, run.assistantId) + + val runs = openAI.runSteps(threadId = run.threadId, runId = run.id) + assertEquals(1, runs.size) + } + @Test fun json() = test { val json = """ diff --git a/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/AssistantStreamEvent.kt b/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/AssistantStreamEvent.kt new file mode 100644 index 00000000..264038ba --- /dev/null +++ b/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/AssistantStreamEvent.kt @@ -0,0 +1,251 @@ +package com.aallam.openai.api.run + +import com.aallam.openai.api.BetaOpenAI +import com.aallam.openai.api.core.Role +import com.aallam.openai.api.message.Message +import com.aallam.openai.api.message.MessageContent +import com.aallam.openai.api.message.MessageId +import com.aallam.openai.api.thread.Thread +import kotlinx.serialization.KSerializer +import kotlinx.serialization.SerialName +import kotlinx.serialization.Serializable +import kotlinx.serialization.builtins.serializer +import kotlinx.serialization.descriptors.PrimitiveKind +import kotlinx.serialization.descriptors.PrimitiveSerialDescriptor +import kotlinx.serialization.descriptors.SerialDescriptor +import kotlinx.serialization.encoding.Decoder +import kotlinx.serialization.encoding.Encoder +import kotlin.reflect.KClass + +/** + * Represents an event emitted when streaming a run. + * @property rawType the raw string of the event type. + * @property type the type of the event or [AssistantStreamEventType.UNKNOWN] if unrecognized. + * @property data the string serialized representation of the data for the event. + */ +@BetaOpenAI +@Serializable +public data class AssistantStreamEvent( + @SerialName("rawType") val rawType: String?, + @SerialName("type") val type: AssistantStreamEventType, + @SerialName("data") val data: String? +) + +/** + * Represents a run step delta i.e. any changed fields on a run step during streaming. + * @property id the identifier of the run step, which can be referenced in API endpoints. + * @property object the object type, which is always thread.run.step.delta. + * @property delta the delta containing the fields that have changed on the run step. + */ +@BetaOpenAI +@Serializable +public data class RunStepDelta( + @SerialName("id") val id: RunStepId, + @SerialName("object") val `object`: String, + @SerialName("delta") val delta: RunStepDeltaData +) + +/** + * The delta containing the fields that have changed on the run step. + * @property stepDetails the details of the run step. + */ +@BetaOpenAI +@Serializable +public data class RunStepDeltaData( + @SerialName("step_details") val stepDetails: RunStepDetails +) + +/** + * Represents a message delta i.e. any changed fields on a message during streaming. + * @param id the identifier of the message, which can be referenced in API endpoints. + * @param object the object type, which is always thread.message.delta. + * @param delta the delta containing the fields that have changed on the message. + */ +@BetaOpenAI +@Serializable +public data class MessageDelta( + @SerialName("id") val id: MessageId, + @SerialName("object") val `object`: String, + @SerialName("delta") val delta: MessageDeltaData +) + +/** + * The delta containing the fields that have changed on the message. + * @param role the entity that produced the message. One of user or assistant. + * @param content the content of the message in array of text and/or images. + */ +@BetaOpenAI +@Serializable +public data class MessageDeltaData( + @SerialName("role") val role: Role, + @SerialName("content") val content: MessageContent +) + +/** + * Represents an event type emitted when streaming a Run. + * @property event the string representation of event type. + * @property dataType the type of the data. + * @property serializer the serializer corresponding to the data type. + */ +@BetaOpenAI +@Serializable(with = AssistantStreamEventTypeSerializer::class) +public enum class AssistantStreamEventType( + public val event: String, + @Suppress("MemberVisibilityCanBePrivate") public val dataType: KClass<*>, + public val serializer: KSerializer<*> +) { + + /** + * Occurs when a new thread is created. + */ + THREAD_CREATED("thread.created", Thread::class, Thread.serializer()), + + /** + * Occurs when a new run is created. + */ + THREAD_RUN_CREATED("thread.run.created", Run::class, Run.serializer()), + + /** + * Occurs when a run moves to a queued status. + */ + THREAD_RUN_QUEUED("thread.run.queued", Run::class, Run.serializer()), + + /** + * Occurs when a run moves to an in_progress status. + */ + THREAD_RUN_IN_PROGRESS("thread.run.in_progress", Run::class, Run.serializer()), + + /** + * Occurs when a run moves to a requires_action status. + */ + THREAD_RUN_REQUIRES_ACTION("thread.run.requires_action", Run::class, Run.serializer()), + + /** + * Occurs when a run is completed. + */ + THREAD_RUN_COMPLETED("thread.run.completed", Run::class, Run.serializer()), + + /** + * Occurs when a run ends with status incomplete. + */ + THREAD_RUN_INCOMPLETE("thread.run.incomplete", Run::class, Run.serializer()), + + /** + * Occurs when a run fails. + */ + THREAD_RUN_FAILED("thread.run.failed", Run::class, Run.serializer()), + + /** + * Occurs when a run moves to a cancelling status. + */ + THREAD_RUN_CANCELLING("thread.run.cancelling", Run::class, Run.serializer()), + + /** + * Occurs when a run is cancelled. + */ + THREAD_RUN_CANCELLED("thread.run.cancelled", Run::class, Run.serializer()), + + /** + * Occurs when a run expires. + */ + THREAD_RUN_EXPIRED("thread.run.expired", Run::class, Run.serializer()), + + /** + * Occurs when a run step is created. + */ + THREAD_RUN_STEP_CREATED("thread.run.step.created", RunStep::class, RunStep.serializer()), + + /** + * Occurs when a run step moves to an in_progress state. + */ + THREAD_RUN_STEP_IN_PROGRESS("thread.run.step.in_progress", RunStep::class, RunStep.serializer()), + + /** + * Occurs when parts of a run step are being streamed. + */ + THREAD_RUN_STEP_DELTA("thread.run.step.delta", RunStepDelta::class, RunStepDelta.serializer()), + + /** + * Occurs when a run step is completed. + */ + THREAD_RUN_STEP_COMPLETED("thread.run.step.completed", RunStep::class, RunStep.serializer()), + + /** + * Occurs when a run step fails. + */ + THREAD_RUN_STEP_FAILED("thread.run.step.failed", RunStep::class, RunStep.serializer()), + + /** + * Occurs when a run step is cancelled. + */ + THREAD_RUN_STEP_CANCELLED("thread.run.step.cancelled", RunStep::class, RunStep.serializer()), + + /** + * Occurs when a run step expires. + */ + THREAD_RUN_STEP_EXPIRED("thread.run.step.expired", RunStep::class, RunStep.serializer()), + + /** + * Occurs when a message is created. + */ + THREAD_MESSAGE_CREATED("thread.message.created", Message::class, Message.serializer()), + + /** + * Occurs when a message moves to an in_progress state. + */ + THREAD_MESSAGE_IN_PROGRESS("thread.message.in_progress", Message::class, Message.serializer()), + + /** + * Occurs when parts of a Message are being streamed. + */ + THREAD_MESSAGE_DELTA("thread.message.delta", MessageDelta::class, MessageDelta.serializer()), + + /** + * Occurs when a message is completed. + */ + THREAD_MESSAGE_COMPLETED("thread.message.completed", Message::class, Message.serializer()), + + /** + * Occurs when a message ends before it is completed. + */ + THREAD_MESSAGE_INCOMPLETE("thread.message.incomplete", Message::class, Message.serializer()), + + /** + * Occurs when an error occurs. This can happen due to an internal server error or a timeout. + */ + ERROR("error", String::class, String.serializer()), + + /** + * Occurs when a stream ends. + * data is [DONE] + */ + DONE("done", String::class, String.serializer()), + + /** + * Occurs when the event type is not recognized + */ + UNKNOWN("unknown", String::class, String.serializer()); + + public companion object { + public fun fromEvent(event: String): AssistantStreamEventType = + entries + .find { it.event == event } + ?: UNKNOWN + } +} + +/** + * Custom serializer for [AssistantStreamEventType]. + */ +@OptIn(BetaOpenAI::class) +public class AssistantStreamEventTypeSerializer : KSerializer { + override val descriptor: SerialDescriptor = PrimitiveSerialDescriptor("AssistantStreamEventType", PrimitiveKind.STRING) + + override fun deserialize(decoder: Decoder): AssistantStreamEventType { + val value = decoder.decodeString() + return AssistantStreamEventType.entries.single { value == it.event } + } + override fun serialize(encoder: Encoder, value: AssistantStreamEventType) { + encoder.encodeString(value.event) + } +} diff --git a/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/RunRequest.kt b/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/RunRequest.kt index a41334bc..dc59bfee 100644 --- a/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/RunRequest.kt +++ b/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/RunRequest.kt @@ -48,6 +48,11 @@ public data class RunRequest( * Keys can be a maximum of 64 characters long, and values can be a maximum of 512 characters long. */ @SerialName("metadata") val metadata: Map? = null, + + /** + * Enables streaming events for this run. Will be overridden based on the api call being made. + */ + @SerialName("stream") val stream: Boolean = false ) /** diff --git a/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/ThreadRunRequest.kt b/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/ThreadRunRequest.kt index ee013c93..33f527a1 100644 --- a/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/ThreadRunRequest.kt +++ b/openai-core/src/commonMain/kotlin/com.aallam.openai.api/run/ThreadRunRequest.kt @@ -50,6 +50,11 @@ public data class ThreadRunRequest( * Keys can be a maximum of 64 characters long, and values can be a maximum of 512 characters long. */ @SerialName("metadata") val metadata: Map? = null, + + /** + * Enables streaming events for this run. Will be overridden based on the api call being made. + */ + @SerialName("stream") val stream: Boolean = false ) /**