跳到主要内容

18-Nest + LangChain 实现基于 SSE 的流式 ai 接口

前言

前面学了 LangChain 的各种功能,但都是在 Node.js 脚本里跑的,而实际上大多数 Agent 都是跑在后端服务里。

比如你和豆包聊天的时候,它会调用 AI 接口,把你的问题传给后端,后端流式返回生成的回答。

这节我们就来学一下 LangChain 和后端框架结合,开发 ai 接口。

我们用 Nest 这个后端框架

我们创建个项目:

npm install -g @nestjs/cli

nest new hello-nest-langchain

具体关于nest这里就不介绍了

使用

安装pnpm install @langchain/core @langchain/openai

生成一个 ai 的模块:nest g res ai --no-spec

然后在 AiService 里调用 langchain 创建一个 chain:

import { Injectable } from '@nestjs/common';
import { ChatOpenAI } from '@langchain/openai';
import { PromptTemplate } from '@langchain/core/prompts';
import type { Runnable } from '@langchain/core/runnables';
import { StringOutputParser } from '@langchain/core/output_parsers';

@Injectable()
export class AiService {
private readonly chain: Runnable;

constructor() {
const prompt = PromptTemplate.fromTemplate('请回答以下问题:\n\n{query}');
const model = new ChatOpenAI({
temperature: 0.7,
modelName: 'qwen-plus',
apiKey: 'sk-xxx',
configuration: {
baseURL: 'https://dashscope.aliyuncs.com/compatible-mode/v1',
},
});
this.chain = prompt.pipe(model).pipe(new StringOutputParser());
}

async runChain(query: string): Promise<string> {
return this.chain.invoke({ query });
}
}

在构造器里创建 ChatModel、chain 避免重复创建。(这里 apikey 之类的先写在代码里,后面优化)

runChain 方法基于传入的参数调用 chain

然后在 AiController 里加一个路由:

import { Controller, Get, Query } from '@nestjs/common';
import { AiService } from './ai.service';

@Controller('ai')
export class AiController {
constructor(private readonly aiService: AiService) {}

@Get('chat')
async chat(@Query('query') query: string) {
const answer = await this.aiService.runChain(query);
return { answer };
}
}

接收 query 参数,调用大模型来回答问题。

跑一下:http://localhost:3000/ai/chat?query=你好

配置抽离

但现在有两个问题:

  • 配置没有抽离
  • 没有流式返回内容

配置的话用这个包:pnpm install @nestjs/config

在 AppModule 里引入 ConfigModule:

import { Module } from '@nestjs/common'
import { AppController } from './app.controller'
import { AppService } from './app.service'
import { AiModule } from './ai/ai.module'
import { ConfigModule } from '@nestjs/config'

@Module({
imports: [
AiModule,
ConfigModule.forRoot({
isGlobal: true,
envFilePath: '.env',
}),
],
controllers: [AppController],
providers: [AppService],
})
export class AppModule {}

它的作用就是读取 .env 配置文件,提供一个 service 来读配置。

isGlobal 设置为 true 就是全局模块,也就是不用 imports 就可以注入里面的 provider

这样我们就可以根目录创建一个 .env 文件,和之前一样:

OPENAI_API_KEY=sk-xxx
OPENAI_BASE_URL=https://dashscope.aliyuncs.com/compatible-mode/v1
MODEL_NAME=qwen-plus

现在配置就可以用 ConfigService 动态读取了:

import { Inject, Injectable } from '@nestjs/common'
import { ChatOpenAI } from '@langchain/openai'
import { PromptTemplate } from '@langchain/core/prompts'
import type { Runnable } from '@langchain/core/runnables'
import { StringOutputParser } from '@langchain/core/output_parsers'
import { ConfigService } from '@nestjs/config'

@Injectable()
export class AiService {
private readonly chain: Runnable

constructor(@Inject(ConfigService) configService: ConfigService) {
const prompt = PromptTemplate.fromTemplate('请回答以下问题:\n\n{query}')
const model = new ChatOpenAI({
temperature: 0.7,
modelName: configService.get('MODEL_NAME'),
apiKey: configService.get('OPENAI_API_KEY'),
configuration: {
baseURL: configService.get('OPENAI_API_URL'),
},
})
this.chain = prompt.pipe(model).pipe(new StringOutputParser())
}

async runChain(query: string): Promise<string> {
return this.chain.invoke({ query })
}
}

流式返回

实现流式返回,这种不断返回内容一般用 SSE(server-sent event) 来做

服务端返回的 Content-Type 是 text/event-stream,这是一个流,可以多次返回内容。

在 AiService 里加一个流式的接口:

async *streamChain(query: string): AsyncGenerator<string> {
const stream = await this.chain.stream({ query });
for await (const chunk of stream) {
yield chunk;
}
}

调用 chain 的 stream 方法,流式返回内容。

这里用到了 js 的生成器语法,也就是方法名那里标个*,然后 yield 不断异步返回内容。

你没用过这个语法也没关系,理解意思就行,过一遍就会了。

然后在 AiController 里调用下这个方法,加一个 chat/stream 接口:

import { Controller, Get, Query, Sse } from '@nestjs/common';
import { AiService } from './ai.service';
import { from, map, Observable } from 'rxjs';

@Controller('ai')
export class AiController {
constructor(private readonly aiService: AiService) {}

@Get('chat')
async chat(@Query('query') query: string) {
const answer = await this.aiService.runChain(query);
return { answer };
}

@Sse('chat/stream')
chatStream(@Query('query') query: string): Observable<{ data: string }> {
return from(this.aiService.streamChain(query)).pipe(
map((chunk) => ({ data: chunk })),
);
}
}

声明接口是 sse 的,然后创建一个 Observable,从 service 的返回流里读取内容,用 map 转成有 data 属性的对象

这个是 rxjs 的写法,Nest 用 rxjs 来处理异步流。

其实和 LCEL 的声明式写法思路一样,就是声明对这个流做什么处理

跑一下,可以看到,通过 sse 的接口就可以流式的返回内容了。

我们写一下前端代码,有的同学可能不知道 sse 的接口怎么调用;

btn.addEventListener('click', () => {
const baseUrl = apiUrlInput.value.replace(/\/$/, '')
const q = queryInput.value.trim()
if (!q) {
status.textContent = '请输入问题'
return
}

const url = `${baseUrl}/ai/chat/stream?query=${encodeURIComponent(q)}`
output.textContent = ''
btn.disabled = true
status.textContent = '连接中...'

const eventSource = new EventSource(url)

eventSource.onmessage = ({ data }) => {
output.textContent += data
status.textContent = '接收中...'
}

eventSource.onerror = () => {
eventSource.close()
btn.disabled = false
status.textContent = '连接已结束'
}

eventSource.addEventListener('done', () => {
eventSource.close()
btn.disabled = false
status.textContent = '完成'
})
})

就是调用 EventSource 的 api,在 onmessage 回调里接收 data 就可以了。

总结

这节我们学了 Nest + LangChain 来开发 ai 接口。

Nest 是一个 Node.js 生态最主流的后端开发框架,提供了 MVC、DI 等特性。

  • 通过 module 来拆分代码,每个 module 包含 service、controller 等。
  • 实现了 DI 依赖注入,通过 @Injectable 声明的 Service,通过 useFactory 创建的对象,都可以作为 provider 来注入。

注入方式包含构造器注入,也就是声明在参数里,以及属性注入,也就是 @Inject 的方式注入

我们基于 LangChain 写了几个 ai 接口:

ChatModel 用 useFactory 创建 provider 来注入。

chain 定义在构造器里,避免重复创建。

同步和流式分别调用 invoke 和 stream 方法。

在 service 里用生成器语法异步返回内容,然后在 controller 创建了一个 sse 的接口,用 rxjs 的 Observable 返回流式数据。

前端代码用 EventSource 来监听 sse 的 message 事件,拿到流式返回的数据。

SSE 在 ai 接口流式返回内容方面是最常用的方式,后面会经常用到。