我们要开发的一个流程:用户说话=>livekit转发=>Go后端接收=>asr实时转文字=>llm回答问题=>tts实时文字合成音频=>音频推送到livekit=>用户听到tts的声音。只是一个最基础的实现,走通流程,没有复杂的说话打断等功能。
LiveKit的sdk非常全,主流的编程语言、前端、后端的都有,开发起来非常简单,而且官方提供了非常多且详细的代码示例。 这里我们继续用Golang连接LiveKit会议室,接收会议室音频,进行asr语音识别,并把识别结果输入大模型,最终把响应打印在控制台(后续会接上tts进行语音合成和播报)。
环境准备:
因为这次用到了opus音频编解码Cgo库,所以只能在Linux下运行。 livekit的go sdk内部opus编解码库使用的 https://github.com/hraban/opus ,需要安装很多linux C组件。
我们本文的代码主要参考并修改了官方的代码示例:https://github.com/livekit/server-sdk-go/tree/main/examples/openai_realtime_voice 官方写的已经很全了,是一个对接OpenAI进行语音对话的流程,我们这里改成了对接第三方ASR(腾讯语音)、第三方LLM大模型
本文代码在这里 https://github.com/finch-xu/livekit-voice-chat-llm-agent
明确一些概念:
livekit房间轨道,包括音频、视频两种。
轨道又分远程轨道、本地轨道两种,对于你的程序来说,你从远程轨道订阅房间的媒体数据流,你使用本地轨道推送媒体流到房间中,分享给其他房间参与者。
每种轨道可以有多个,比如摄像头轨道、桌面共享轨道(也是视频轨道)等等。sdk里都有回调和标识,能清晰告诉你读取哪个轨道。
音频轨道默认编码是Opus,视频轨道的默认编码是VP8。
1. 安装依赖 这些依赖主要是https://github.com/hraban/opus 这个包在用,可以参考它的文档。
1 apt-get install -y pkg-config libopus-dev libopusfile-dev libsoxr-dev
2. 订阅LiveKit音频流并输入ASR识别 因为WebRTC本身是不支持原始PCM格式的(只支持PCM的变种),LiveKit房间默认音频流是Opus格式,但是官方在最近的sdk版本中更新了PCM订阅和发布的本地轨道,也就实现了对原始PCM流的直接操作,不需要我们自己手动进行Opus和PCM的编解码、采样率提升等复杂操作。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 func callbacksForLkRoom (handler *ASRAPIHandler) *lksdk.RoomCallback { var pcmRemoteTrack *lkmedia.PCMRemoteTrack return &lksdk.RoomCallback{ ParticipantCallback: lksdk.ParticipantCallback{ OnTrackSubscribed: func (track *webrtc.TrackRemote, publication *lksdk.RemoteTrackPublication, rp *lksdk.RemoteParticipant) { if pcmRemoteTrack != nil { return } pcmRemoteTrack, _ = handleSubscribe(track, handler) }, }, } } func handleSubscribe (track *webrtc.TrackRemote, handler *ASRAPIHandler) (*lkmedia.PCMRemoteTrack, error ) { if track.Codec().MimeType != webrtc.MimeTypeOpus { logger.Warnw("Received non-opus track" , nil , "track" , track.Codec().MimeType) } writer := NewRemoteTrackWriter(handler) trackWriter, err := lkmedia.NewPCMRemoteTrack(track, writer, lkmedia.WithTargetSampleRate(16000 )) if err != nil { logger.Errorw("Failed to create remote track" , err) return nil , err } return trackWriter, nil }
官方的例子realtime_api_handler.go中,直接把音频流输入给了OpenAI,输出也是音频流并直接推送到管道并推送到房间中。我们这里截胡音频,先把音频流写入ASR。
关于腾讯语音识别接口,有很多种,我们这里使用的实时语音识别(WebSocket),因为要进行实时流式识别。
实时语音识别的接口一般会先返回非稳态结果(就是识别的中间结果),这个结果我们可以抛弃,等待稳态结果再传给大模型。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 func (h *ASRAPIHandler) SendAudioChunk(sample media.PCM16Sample) error { bytes := make ([]byte , len (sample)*2 ) for i, s := range sample { binary.LittleEndian.PutUint16(bytes[i*2 :], uint16 (s)) } return h.conn.WriteMessage(websocket.BinaryMessage, bytes) } func (h *ASRAPIHandler) readMessages() { defer func () { h.Close() }() for { _, message, err := h.conn.ReadMessage() if err != nil { logger.Errorw("Error reading message" , err) break } fmt.Printf("asr resp text message: %v\n" , string (message)) var tmpRespBody respBody err = json.Unmarshal(message, &tmpRespBody) if err != nil { logger.Errorw("Error reading message" , err) break } voiceTextStr := tmpRespBody.Result.VoiceTextStr fmt.Printf("voiceTextStr: %s\n" , voiceTextStr) if tmpRespBody.Result.SliceType == 2 { h.ch <- voiceTextStr } } }
3. 大模型读取ASR结果并进行响应 这一块没啥好说的,就是goroutine持续监听,有ASR结果推送过来,就请求大模型,并解析SSE的流式结果。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 func llm (asrToLLMChan chan string , dataStreamChan chan string ) { for { select { case text, ok := <-asrToLLMChan: if !ok { return } requestLLM(text, dataStreamChan) } } } func requestLLM (query string , llmToTTSChan chan string ) { ... if choices, ok := event["choices" ].([]interface {}); ok && len (choices) > 0 { if choice, ok := choices[0 ].(map [string ]interface {}); ok { if delta, ok := choice["delta" ].(map [string ]interface {}); ok { if content, ok := delta["content" ].(string ); ok { fmt.Println("Extracted content:" , content) llmToTTSChan <- content } } } } }
4. 大模型SSE结果流式推送TTS实时语音合成 tts的思路就是启动两个goroutine,一个持续监听大模型的消息推送chan并把文本实时推送给tts,一个持续监听tts接口的音频流并推送到livekit房间中。
我用的腾讯语音合成通用语音合成相关接口>流式文本语音合成接口,专门面向大模型流式的,我的代码里已经写了完整对接的代码,你也可以自行参考官方文档 https://cloud.tencent.com/document/product/1073/108595
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 func (h *TTSAPIHandler) sendText() error { defer func () { h.Close() }() for { select { case text := <-h.inCh: body := map [string ]interface {}{ "action" : "ACTION_SYNTHESIS" , "data" : text, } bytes, _ := json.Marshal(body) err := h.conn.WriteMessage(websocket.TextMessage, bytes) if err != nil { return err } } } } func (h *TTSAPIHandler) readMessages() error { defer func () { h.Close() }() for { messageType, message, err := h.conn.ReadMessage() if err != nil { logger.Errorw("Error reading message" , err) return err } if messageType == websocket.BinaryMessage { h.outCh <- message } } }
tts websocket接口返回的是原始PCM包,我们得转成livekit封装的格式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 func handlePublish (room *lksdk.Room, ttsAudioChan chan []byte ) { publishTrack, err := lkmedia.NewPCMLocalTrack(16000 , 1 , logger.GetLogger()) if err != nil { panic (err) } defer func () { publishTrack.ClearQueue() publishTrack.Close() }() if _, err = room.LocalParticipant.PublishTrack(publishTrack, &lksdk.TrackPublicationOptions{ Name: "tts audio" , }); err != nil { panic (err) } for sample := range ttsAudioChan { audioPCM16 := make (media.PCM16Sample, len (sample)/2 ) for i := 0 ; i < len (sample); i += 2 { audioPCM16[i/2 ] = int16 (binary.LittleEndian.Uint16(sample[i : i+2 ])) } if err := publishTrack.WriteSample(audioPCM16); err != nil { logger.Errorw("Failed to write sample" , err) } } }
5. 把整体流程串起来 创建一些管道chan,我们需要如下数据转发:
livekit Opus音频推送给asr pcm websocket接口,直接监听livekit房间回调,不需要创建chan
asr websocket返回文本消息,文本推送给llm程序,创建asrToLLMChan
llm sse结果流式推送给tts websocket接口,创建llmToTTSChan
tts websocket返回pcm流,转成PCM16Sample流,推送到livekit放进,创建ttsAudioChan
1 2 3 4 5 6 7 8 9 10 11 12 13 asrToLLMChan := make (chan string ) defer close (asrToLLMChan)llmToTTSChan := make (chan string ) defer close (llmToTTSChan)ttsAudioChan := make (chan []byte ) defer close (ttsAudioChan)
启动asr、llm、tts等模块。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 handler, err := NewASRAPIHandler(asrToLLMChan) if err != nil { panic (err) } defer handler.Close()go llm(asrToLLMChan, llmToTTSChan)room, err := connectToLKRoom(callbacksForLkRoom(handler)) if err != nil { panic (err) } defer room.Disconnect()NewTTSAPIHandler(llmToTTSChan, ttsAudioChan) go handlePublish(room, ttsAudioChan)
完整的代码放在了这里 https://github.com/finch-xu/livekit-voice-chat-llm-agent 整体流程都挺简单的,没啥可写的,所以刚才只说了一些细节。
6. 开始对话 6.1 启动后端 还是在上文的Ubuntu系统中执行。
1 2 3 4 5 6 go env -w GOPROXY=https://goproxy.cn,direct go mod tidy go build -tags cgo -o main ./main
6.2 前端进行语音对话 给前端签一个livekit token,执行deploy/livekit/gen_room_token.go,记得把里边的apiKey,apiSecret换成你部署livekit设置的值。
我们使用网页作为前端客户端来测试 https://meet.livekit.io/ ,点击Custom选项卡,填写LiveKit Server URL: ws://192.168.0.1:7880(就是你刚才部署的ip和端口号),token复制进去,然后点击Connect进入会议室,进入后,你可以打开麦克风和摄像头,基础功能和腾讯会议类似。
进入会议室后,房间里有你和后端两个参会者,在页面上打开麦克风说话,会在后端的日志中打印asr结果、大模型响应结果,然后tts会在会议室里播报,至此完成了一个基于webrtc的大模型语音对话服务。
参考 下边是几种部署livekit的方式
livekit server go sdk https://github.com/livekit/server-sdk-go 腾讯语音合成TTS https://cloud.tencent.com/product/tts 腾讯语音识别ASR https://cloud.tencent.com/product/asr