LLM流式方案解决方案和客户端解决方案

news/2024/7/19 14:32:34 标签: LLM, JS, Java

背景

接上一篇《LLM大模型统一封装接口解决方案》架构确定后,流式方案非常规请求,需要特殊处理。

本解决方案就是针对上一篇中所需要的流式(打字机效果进行编码)

什么是SSE

SSE(Server-Sent Events,服务器发送事件)是一种基于HTTP的服务器到客户端的单向通信技术,用于实现服务器向客户端推送数据的功能。SSE协议标准由HTML5规范定义,并且其定义被包含在HTML Living Standard中。

SSE允许服务器通过HTTP连接向客户端发送数据,而无需客户端发起请求。这使得SSE非常适合于实时通信或推送通知给客户端的应用程序,例如实时股票报价、即时通讯、实时监控等场景。

基本上,SSE由以下要素组成:

  1. 服务器:负责向客户端发送事件流的HTTP服务器。
  2. 客户端:通过浏览器中的EventSource API与服务器建立连接,接收服务器发送的事件。
  3. 事件流(Event Stream):服务器向客户端发送的数据流,格式为纯文本,使用一种特定的格式进行编码,例如MIME类型为"text/event-stream"。

SSE的优点包括简单易用、实现方便、跨浏览器支持良好等。然而,它也有一些限制,例如不能支持双向通信,与WebSocket相比,SSE的实时性稍逊一筹。

Java_21">Java框架说明

pom 文件引入的核心依赖包

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.0</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>aip.com</groupId>
    <artifactId>aip-com</artifactId>
    <version>0.0.1</version>
    <name>aip-com</name>
    <description>aip com project for Spring Boot</description>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-webflux</artifactId>
        </dependency>

        <dependency>
            <groupId>io.reactivex.rxjava2</groupId>
            <artifactId>rxjava</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

Java_83">Java后端核心代码

本方法是标准的SSE协议标准


    private final ExecutorService executorService = Executors.newFixedThreadPool(5);

    
    /**
     * 会话请求
     *
     * @return String
     */
    @PostMapping(value = "/completions", consumes = MediaType.APPLICATION_JSON_VALUE)
    @Operation(summary = "会话请求")
    public SseEmitter completions(@RequestBody CompletionRequest completionRequest) {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        SseEmitter emitter = new SseEmitter();

        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 向客户端发送事件
                    emitter.send(
                            SseEmitter.event()
                                    .name("message")
                                    .data(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                            .ended(false)
                                             .message(String.valueOf(i))
                                            .build()))
                    );
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public SseEmitter stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);
        SseEmitter emitter = new SseEmitter();

        executorService.execute(() -> {
            try {
                for (int i = 0; i < 10; i++) {
                    // 向客户端发送事件
                    emitter.send(
                            SseEmitter.event()
                                    .name("message")
                                    .data(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                            .ended(false)
                                             .message(String.valueOf(i))
                                            .build()))
                    );
                    Thread.sleep(1000);
                }
                emitter.complete();
            } catch (Exception e) {
                emitter.completeWithError(e);
            }
        });
        return emitter;

Flux 和 Flowable 对比

Flux 和 Flowable 都是响应式编程库中的数据流类型,用于处理异步和基于事件的流式数据。它们分别来自于不同的库,Flux 是 Reactor 库的一部分,而 Flowable 则是 RxJava 库的一部分。以下是它们之间的一些区别:

  1. 库的来源:

    • Flux 来自于 Reactor 库,是 Reactor 的核心组件之一,React的核心模块用于基于反应式流规范处理数据流。
    • Flowable 来自于 RxJava 库,是 RxJava 的核心类之一,RxJavaJava 平台的反应式扩展库,用于处理异步和基于事件的编程。
  2. 背压策略:

    • Flux 默认采用背压策略为 BUFFER,可以通过 onBackpressureBuffer、onBackpressureDrop、onBackpressureLatest 等方法来指定不同的背压策略。
    • Flowable 默认也是支持背压的,但是相比 Flux,Flowable 提供了更多的背压策略,如 BUFFER、DROP、LATEST、ERROR、MISSING。
  3. 反应式规范:

    • Flux 遵循 Reactor 库的反应式流规范,使用 Mono 和 Flux 来表示异步流和单个结果。
    • Flowable 遵循 RxJava 库的反应式流规范,使用 Observable 和 Flowable 来表示异步流和单个结果。
  4. 生态系统:

    • Reactor 生态系统主要用于基于 Reactor 的应用程序。
    • RxJava 生态系统则更广泛,它是 ReactiveX 的一部分,支持多种语言和平台,并有许多衍生项目。

总的来说,Flux 和 Flowable 在概念上很相似,都用于处理异步和基于事件的流式数据,但它们来自于不同的库,并且有一些细微的区别,如背压策略和生态系统支持。您可以根据项目需求选择适合的库和数据流类型。

JavaFlowable_179">Java后端Flowable方式

本方法是Flowable方式,非标准流式规则

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public Flowable<String> stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);

        Flowable<String> typingFlow = Flowable.create(emitter -> {
            executorService.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {

                        emitter.onNext(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                .ended(false)
                                .message(String.valueOf(i))
                                .build()));

                        Thread.sleep(1000);
                    }
                    emitter.onComplete();
                } catch (Exception e) {

                }
            });
        }, BackpressureStrategy.BUFFER);

        return typingFlow;
    }

JavaFlux_217">Java后端Flux方式

本方法是Flux方式,非标准流式规则

    /**
     * 会话请求
     *
     * @return String
     */
    @GetMapping(value = "/stream")
    @Operation(summary = "会话请求")
    public Flux<String> stream() {
        response.setContentType(MediaType.TEXT_EVENT_STREAM_VALUE);

        Flux<String> typingFlow = Flux.create(emitter -> {
            executorService.execute(() -> {
                try {
                    for (int i = 0; i < 10; i++) {

                        emitter.next(JsonHelper.toJSONString(new StreamCompletionResult.Builder()
                                .ended(false)
                                .message(String.valueOf(i))
                                .build()));

                        Thread.sleep(1000);
                    }
                    emitter.complete();
                } catch (Exception e) {

                }
            });
        }, FluxSink.OverflowStrategy.BUFFER);

        return typingFlow;
    }
}

HTML 客户端接收示例程序

function EventSourceGetRequest() SSE 默认方法,只支持GET请求,适合演示用途以及后端包装好服务

function fetchPostRequest() fetch POST 请求实现SSE,支持所有请求(POST,GET等)以及传递参数

sse.html 内容

<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <title>SEE Example</title>
    <script>

        // SSE 默认方法,只支持GET请求
        function EventSourceGetRequest() {
            if(typeof(EventSource)!=="undefined")
            {
                var eventSource = new EventSource('http://127.0.0.1:8090/v1/chat/stream');
                eventSource.onmessage = function(event)
                {
                    document.getElementById('result').insertAdjacentHTML('beforeend', `${event.data}<br/><br/>`);
                    console.log(event)
                };
            }
            else
            {
                document.getElementById("result").innerHTML="抱歉,你的浏览器不支持 server-sent 事件...";
            }
        }

        // fetch POST 请求实现SSE
        function fetchPostRequest() {
            fetch('http://127.0.0.1:8090/v1/chat/completions', {
                method: 'POST',
                headers: {
                    'Content-Type': 'application/json'
                },
                body: JSON.stringify({}),
            })
            .then(response => {
                // 检查响应是否成功
                if (!response.ok) {
                    throw new Error('Network response was not ok');
                }
                // 返回 ReadableStream 对象
                return response.body;
            })
            .then(stream => {
                // 创建一个新的文本解码器
                const decoder = new TextDecoder();
                
                // 获取一个 reader 对象
                const reader = stream.getReader();
                
                let chunk = ''
                
                // 逐块读取数据
                function read() {
                    reader.read().then(({ done, value }) => {
                        if (done) {
                            document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);
                            console.log('Stream has ended');
                            return;
                        }
                        // 将数据块转换为字符串并显示
                        const tmp = decoder.decode(value, { stream: true });
                        if (tmp.startsWith('event:') && chunk!='') {
                            document.getElementById('result').insertAdjacentHTML('beforeend', `${chunk}<hr/>`);
                            chunk = tmp
                        }else{
                            chunk = chunk + tmp
                        }
                        // 继续读取下一块数据
                        read();
                    });
                }
                // 开始读取数据
                read();
            })
            .catch(error => {
                // 处理错误
                console.error('There was a problem with the fetch operation:', error);
            });
        }

        // EventSourceGetRequest();
        fetchPostRequest();
    </script>
</head>
<body>
	<h1>SEE result</h1>
    <div id="result"></div>
</body>
</html>
  • 标准SSE示例

标准SSE

  • 扩展SSE

在这里插入图片描述


http://www.niftyadmin.cn/n/5437670.html

相关文章

D4147——低功率接地故障断路器(GFI)控制器芯片,用于检测危险的接地故障电流路径以及接地对零线故障,可应用于GFCI 和RCD

D4147主要用于三线制GFCI输出接口、GFCI芯片断路器、便携式GFCI线路等领域的产品&#xff0c;侦测并防护火线对地故障和零线对负载短路故障。 功能介绍 D4147 为低功率接地故障断路器&#xff08;GFI&#xff09;控制器芯片&#xff0c;用于检测危险的接地故障电流路径以及接地…

计算机网络的组成

目录 <计算机网络的组成> 1.网络硬件 1)主机Host 2)终端Terminal 3)通信控制处理机 4)传输介质 5)网络连接设备 2.网络软件 1)网络操作系统 2)网络协议软件 3)网络管理软件 4)网络通信软件 5)网络应用软件 3通信子网和资源子网 <计算机网络的组成> 无…

XR虚拟拍摄:短剧制作的新宠

XR虚拟拍摄&#xff1a;短剧制作的新宠 随着数字技术的快速发展&#xff0c;短剧拍摄领域正在经历一场革命性的变革。XR&#xff08;扩展现实&#xff09;技术的兴起&#xff0c;为短剧制作带来了前所未有的机遇与挑战。近年来&#xff0c;越来越多的短剧制作团队开始采用XR虚拟…

JAVA后端调用OpenAI接口 实现打字机效果(SSE)

SSE SSE&#xff08;Server-Sent Events&#xff0c;服务器发送事件&#xff09;是一种基于HTTP协议的通信技术&#xff0c;它允许服务器持续地将数据推送给客户端&#xff0c;而无需客户端发起请求。这种通信方式通常用于实时性要求较高的场景&#xff0c;如实时更新、通知、或…

【R1芯片:Apple Vision Pro中最神秘的角落】

没错&#xff0c;看到标题各位一定已经知道了&#xff0c;本篇文章我们接着来聊一聊Apple Vision Pro。Apple Vision Pro在北美正式发售至今已经过了一个月多了&#xff0c;笔者也是写了数篇文章来讲解了这款Apple全新的空间计算设备&#xff0c;那为什么今天还来探讨Apple Vis…

美食制作手记

美食制作手记 文章目录 美食制作手记1. 电饭煲焖饭系列2. 电饭煲糕点 1. 电饭煲焖饭系列 一周五款不重样的电饭煲焖饭&#xff01;妈妈再也不用担心我没好好吃饭了~ 本系列先预炒&#xff0c;然后倒入生米中一锅出。 老干妈香菇腊肠焖饭&#xff1a; 加工&#xff1a;4根腊肠切…

20240318uniapp怎么引用组件

在script中增加 import index from "/pages/index/index.vue" 把index直接整个作为一个组件引入 然后注册组件 在export default中增加 components: {index:index }, 注册了index组件&#xff0c;内容为import的index 然后就可以在template里使用 <index&…

mybatis拦截器打印sql日志

前言 利用mybatis拦截器打印输出sql 操作 编写拦截器 package com.it2.excel01.interceptor;import java.text.DateFormat; import java.util.Date; import java.util.List; import java.util.Locale; import java.util.Properties;import lombok.extern.slf4j.Slf4j; impo…