Skip to content

Commit

Permalink
feat(stream-text): add onChunk() (#27)
Browse files Browse the repository at this point in the history
* feat(stream-text): add onChunk()

* chore(stream-text): update test

* chore(stream-text): update test snapshot
  • Loading branch information
kwaa authored Jan 15, 2025
1 parent 2bc1733 commit 4d5a1e4
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 13 deletions.
28 changes: 16 additions & 12 deletions packages/stream-text/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import {
} from '@xsai/shared-chat'

export interface StreamTextOptions extends ChatOptions {
onChunk?: (chunk: StreamTextResponse) => Promise<void> | void
/** if you want to disable stream, use `@xsai/generate-{text,object}` */
stream?: never
streamOptions?: {
Expand Down Expand Up @@ -48,8 +49,8 @@ export interface StreamTextResponse {
usage?: StreamTextResponseUsage
}

const dataHeaderPrefix = 'data: '
const dataErrorPrefix = `{"error":`
const chunkHeaderPrefix = 'data: '
const chunkErrorPrefix = `{"error":`

/**
* @experimental WIP, does not support function calling (tools).
Expand All @@ -66,7 +67,7 @@ export const streamText = async (options: StreamTextOptions): Promise<StreamText

// null body handled by import('@xsai/shared-chat').chat()
const rawChunkStream = res.body!.pipeThrough(new TransformStream({
transform: (chunk, controller) => {
transform: async (chunk, controller) => {
buffer += decoder.decode(chunk)
const lines = buffer.split('\n\n')
buffer = lines.pop() || ''
Expand All @@ -75,30 +76,33 @@ export const streamText = async (options: StreamTextOptions): Promise<StreamText
// Some cases:
// - Empty chunk
// - :ROUTER PROCESSING from OpenRouter
if (!line || !line.startsWith(dataHeaderPrefix)) {
if (!line || !line.startsWith(chunkHeaderPrefix)) {
continue
}

if (line.startsWith(dataErrorPrefix)) {
if (line.startsWith(chunkErrorPrefix)) {
// About controller error: https://developer.mozilla.org/en-US/docs/Web/API/TransformStreamDefaultController/error
controller.error(new Error(`Error from server: ${line}`))
break
}

const lineWithoutPrefix = line.slice(dataHeaderPrefix.length)
const lineWithoutPrefix = line.slice(chunkHeaderPrefix.length)
if (lineWithoutPrefix === '[DONE]') {
controller.terminate()
break
}

const data: StreamTextResponse = JSON.parse(lineWithoutPrefix)
controller.enqueue(data)
const chunk: StreamTextResponse = JSON.parse(lineWithoutPrefix)
controller.enqueue(chunk)

if (data.choices[0].finish_reason) {
finishReason = data.choices[0].finish_reason
if (options.onChunk)
await options.onChunk(chunk)

if (chunk.choices[0].finish_reason) {
finishReason = chunk.choices[0].finish_reason
}
if (data.usage) {
usage = data.usage
if (chunk.usage) {
usage = chunk.usage
}
}
},
Expand Down
2 changes: 2 additions & 0 deletions packages/stream-text/test/__snapshots__/index.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -185,3 +185,5 @@ exports[`@xsai/stream-text > the-quick-brown-fox 2`] = `
},
]
`;

exports[`@xsai/stream-text > the-quick-brown-fox 3`] = `11`;
8 changes: 7 additions & 1 deletion packages/stream-text/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@ describe('@xsai/stream-text', () => {
})

it('the-quick-brown-fox', async () => {
let onChunkCount = 0
let chunkCount = 0

const { chunkStream, textStream } = await streamText({
...ollama.chat('llama3.2'),
messages: [
Expand All @@ -43,6 +46,7 @@ describe('@xsai/stream-text', () => {
role: 'user',
},
],
onChunk: () => { onChunkCount++ },
})

const chunk: StreamTextResponse[] = []
Expand All @@ -54,6 +58,7 @@ describe('@xsai/stream-text', () => {
created: undefined,
id: undefined,
}))
chunkCount++
}

for await (const textPart of textStream) {
Expand All @@ -62,8 +67,9 @@ describe('@xsai/stream-text', () => {

expect(text.join('')).toBe('The quick brown fox jumps over the lazy dog.')
expect(text).toMatchSnapshot()

expect(chunk).toMatchSnapshot()

expect(onChunkCount).toMatchSnapshot(chunkCount)
})

// TODO: error handling
Expand Down

0 comments on commit 4d5a1e4

Please sign in to comment.