Skip to content

Stream 插件

¥Stream Plugin

警告

此插件处于维护模式,不会添加新功能。我们建议使用 改为使用 Generator Stream

¥This plugin is in maintenance mode and will not receive new features. We recommend using the Generator Stream instead

此插件添加了对流式响应或将服务器发送事件发送回客户端的支持。

¥This plugin adds support for streaming response or sending Server-Sent Event back to the client.

使用以下工具安装:

¥Install with:

bash
bun add @elysiajs/stream

然后使用它:

¥Then use it:

typescript
import { Elysia } from 'elysia'
import { Stream } from '@elysiajs/stream'

new Elysia()
    .get('/', () => new Stream(async (stream) => {
        stream.send('hello')

        await stream.wait(1000)
        stream.send('world')

        stream.close()
    }))
    .listen(3000)

默认情况下,Stream 将返回 Responsecontent-typetext/event-stream; charset=utf8

¥By default, Stream will return Response with content-type of text/event-stream; charset=utf8.

构造函数

¥Constructor

以下是 Stream 接受的构造函数参数:

¥Below is the constructor parameter accepted by Stream:

  1. Stream:

    • 自动:根据提供的值自动流式响应

      • 可迭代对象

      • AsyncIterable

      • ReadableStream

      • 响应

    • 手动:(stream: this) => unknownundefined 的回调

  2. 选项:StreamOptions

    • event:用于标识所描述事件类型的字符串

    • retry:重新连接时间(以毫秒为单位)

方法

¥Method

以下是 Stream 提供的方法:

¥Below is the method provided by Stream:

send

将数据排队到流中以发送回客户端

¥Enqueue data to stream to send back to the client

close

关闭流

¥Close the stream

wait

返回一个以毫秒为单位解析提供的值的 Promise。

¥Return a promise that resolves in the provided value in ms

value

ReadableStream 的内部值

¥Inner value of the ReadableStream

模式

¥Pattern

以下是使用该插件的常见模式。

¥Below you can find the common patterns to use the plugin.

OpenAI

当参数为 IterableAsyncIterable 时,将触发自动模式,并自动将响应流式传输回客户端。

¥Automatic mode is triggered when the parameter is either Iterable or AsyncIterable streaming the response back to the client automatically.

以下是将 ChatGPT 集成到 Elysia 的示例。

¥Below is an example of integrating ChatGPT into Elysia.

ts
new Elysia()
    .get(
        '/ai',
        ({ query: { prompt } }) =>
            new Stream(
                openai.chat.completions.create({
                    model: 'gpt-3.5-turbo',
                    stream: true,
                    messages: [{
                        role: 'user',
                        content: prompt
                    }]
                })
            )
    )

默认情况下,openai chatGPT 完成返回 AsyncIterable,因此你应该能够将 OpenAI 封装在 Stream 中。

¥By default openai chatGPT completion returns AsyncIterable so you should be able to wrap the OpenAI in Stream.

获取流

¥Fetch Stream

你可以将从返回流的端点的获取传递给代理流。

¥You can pass a fetch from an endpoint that returns the stream to proxy a stream.

这对于使用 AI 文本生成的端点非常有用,因为你可以直接代理它,例如。 Cloudflare AI

¥This is useful for those endpoints that use AI text generation since you can proxy it directly, eg. Cloudflare AI.

ts
const model = '@cf/meta/llama-2-7b-chat-int8'
const endpoint = `https://api.cloudflare.com/client/v4/accounts/${process.env.ACCOUNT_ID}/ai/run/${model}`

new Elysia()
    .get('/ai', ({ query: { prompt } }) =>
        fetch(endpoint, {
            method: 'POST',
            headers: {
                authorization: `Bearer ${API_TOKEN}`,
                'content-type': 'application/json'
            },
            body: JSON.stringify({
                messages: [
                    { role: 'system', content: 'You are a friendly assistant' },
                    { role: 'user', content: prompt }
                ]
            })
        })
    )

服务器发送事件

¥Server Sent Event

当参数为 callbackundefined 时,将触发手动模式,让你可以控制流。

¥Manual mode is triggered when the parameter is either callback or undefined, allowing you to control the stream.

callback-based

以下是使用构造函数回调创建服务器发送事件端点的示例。

¥Below is an example of creating a Server-Sent Event endpoint using a constructor callback

ts
new Elysia()
    .get('/source', () =>
        new Stream((stream) => {
            const interval = setInterval(() => {
                stream.send('hello world')
            }, 500)

            setTimeout(() => {
                clearInterval(interval)
                stream.close()
            }, 3000)
        })
    )

value-based

以下是使用基于值的构造函数创建服务器发送事件端点的示例。

¥Below is an example of creating a Server-Sent Event endpoint using a value-based

ts
new Elysia()
    .get('/source', () => {
        const stream = new Stream()

        const interval = setInterval(() => {
            stream.send('hello world')
        }, 500)

        setTimeout(() => {
            clearInterval(interval)
            stream.close()
        }, 3000)

        return stream
    })

基于回调和基于值的流的工作方式相同,但语法有所不同,以根据你的偏好设置。

¥Both callback-based and value-based streams work in the same way but with different syntax for your preference.