c++接入Qwen-Omni-Realtime问题
c++ 代码在发送input_audio_buffer.commit后,连接就被关闭了。音频是对的,java代码是可以跑的。
#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <fstream>
#include <vector>
#include <random>
#include "json.hpp"
// Boost.Asio + Beast
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>
using json = nlohmann::json;
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;
// 自定义Base64实现(确保与Java兼容)
static const std::string base64_chars =
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz"
"0123456789+/";
std::string base64_encode(const uint8_t* data, size_t len) {
std::string ret;
int i = 0;
int j = 0;
uint8_t char_array_3[3];
uint8_t char_array_4[4];
const uint8_t* bytes_to_encode = data;
size_t in_len = len;
while (in_len > 0) {
char_array_3[i++] = *(bytes_to_encode++);
in_len--;
if (i == 3) {
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for(i = 0; i < 4; i++)
ret += base64_chars[char_array_4[i]];
i = 0;
}
}
if (i > 0) {
for(j = i; j < 3; j++)
char_array_3[j] = 0;
char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
char_array_4[3] = char_array_3[2] & 0x3f;
for (j = 0; j < i + 1; j++)
ret += base64_chars[char_array_4[j]];
while(i++ < 3)
ret += '=';
}
return ret;
}
// 读取二进制文件
std::vector<uint8_t> read_binary_file(const std::string& filepath) {
std::ifstream file(filepath, std::ios::binary);
if (!file) {
throw std::runtime_error("无法打开文件: " + filepath);
}
return std::vector<uint8_t>((std::istreambuf_iterator<char>(file)),
std::istreambuf_iterator<char>());
}
// 写入二进制文件
void write_binary_file(const std::string& filepath, const std::vector<uint8_t>& data) {
std::ofstream file(filepath, std::ios::binary);
if (!file) {
throw std::runtime_error("无法创建文件: " + filepath);
}
file.write(reinterpret_cast<const char*>(data.data()), data.size());
}
// 生成事件ID
std::string generate_event_id() {
static const std::string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
static std::random_device rd;
static std::mt19937 gen(rd());
static std::uniform_int_distribution<> dis(0, chars.size() - 1);
std::string id = "event_";
for (int i = 0; i < 16; ++i) {
id += chars[dis(gen)];
}
return id;
}
// 处理WebSocket响应的函数
void process_responses(websocket::stream<ssl::stream<tcp::socket>>& ws,
const std::string& output_audio_file) {
std::string full_text;
std::vector<uint8_t> audio_buffer;
bool response_completed = false;
auto start_time = std::chrono::steady_clock::now();
while (!response_completed &&
std::chrono::steady_clock::now() - start_time < std::chrono::seconds(60)) {
beast::flat_buffer response_buffer;
beast::error_code ec;
// 使用阻塞读取
ws.read(response_buffer, ec);
if (ec) {
if (ec == websocket::error::closed) {
std::cout << "ℹ️ 连接已关闭\n";
break;
} else {
std::cerr << "⚠️ 读取错误: " << ec.message() << "\n";
break;
}
}
std::string response_msg = beast::buffers_to_string(response_buffer.data());
auto response_data = json::parse(response_msg);
std::string type = response_data.value("type", "");
std::cout << "📥 收到事件: " << type << "\n";
if (type == "session.created") {
std::cout << "✅ Session 创建成功! ID: "
<< response_data["session"]["id"] << "\n";
} else if (type == "session.updated") {
std::cout << "✅ Session 更新成功!\n";
} else if (type == "input_audio_buffer.committed") {
std::cout << "✅ 音频提交已确认\n";
} else if (type == "conversation.item.created") {
std::cout << "💬 对话项已创建\n";
} else if (type == "conversation.item.input_audio_transcription.completed") {
if (response_data.contains("transcript")) {
std::string transcript = response_data["transcript"].get<std::string>();
std::cout << "🎤 语音识别结果: " << transcript << "\n";
}
} else if (type == "response.created") {
std::cout << "✅ 响应已创建\n";
} else if (type == "response.text.delta") {
if (response_data.contains("delta")) {
std::string delta = response_data["delta"].get<std::string>();
full_text += delta;
std::cout << "💬 " << delta << std::flush;
}
} else if (type == "response.audio.delta") {
if (response_data.contains("delta")) {
// 注意:这里需要base64解码,但Java代码中似乎没有处理音频响应
std::cout << "🔊 收到音频数据\n";
}
} else if (type == "response.audio_transcript.delta") {
if (response_data.contains("delta")) {
std::string delta = response_data["delta"].get<std::string>();
full_text += delta;
std::cout << "📄 " << delta << std::flush;
}
} else if (type == "response.done") {
std::cout << "\n\n✅ AI 响应完成!\n";
if (!full_text.empty()) {
std::cout << "📝 完整文本回答: " << full_text << "\n";
}
response_completed = true;
} else if (type == "error") {
std::string error_msg = response_data["error"].value("message", "未知错误");
std::cerr << "❌ 服务端错误: " << error_msg << "\n";
if (response_data.contains("code")) {
std::cerr << " 错误代码: " << response_data["code"] << "\n";
}
break;
} else {
std::cout << "📥 其他事件: " << response_msg << "\n";
}
}
if (!response_completed) {
std::cout << "⚠️ 响应未在超时时间内完成\n";
}
}
int main() {
#ifdef _WIN32
SetConsoleOutputCP(CP_UTF8);
#endif
const int MAX_RETRIES = 1;
int retry_count = 0;
std::string apiKey = "";
std::string input_audio_file = "F:/q1_16khz.pcm";
std::string output_audio_file = "F:/response.pcm";
while (retry_count < MAX_RETRIES) {
try {
std::string host = "dashscope.aliyuncs.com";
std::string port = "443";
std::string target = "/api-ws/v1/realtime?model=qwen3-omni-flash-realtime";
std::cout << "🔧 配置信息:\n";
std::cout << " API Key: sk-******* (已隐藏部分)\n";
std::cout << " 主机: " << host << "\n";
std::cout << " 端口: " << port << "\n";
std::cout << " 目标: " << target << "\n\n";
// ===== 初始化网络上下文 =====
net::io_context ioc;
ssl::context ctx(ssl::context::tlsv13_client);
ctx.set_default_verify_paths();
// ===== 解析域名 =====
tcp::resolver resolver(ioc);
auto const results = resolver.resolve(host, port);
std::cout << "🔍 正在解析域名: " << host << ":" << port << "\n";
// ===== 创建SSL连接 =====
ssl::stream<tcp::socket> stream(ioc, ctx);
net::connect(stream.next_layer(), results.begin(), results.end());
std::cout << "🔌 已建立TCP连接\n";
// ===== 设置SNI =====
if (!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) {
throw std::runtime_error("设置SNI失败");
}
// ===== SSL握手 =====
stream.handshake(ssl::stream_base::client);
std::cout << "🤝 SSL握手成功\n";
// ===== 创建WebSocket =====
websocket::stream<ssl::stream<tcp::socket>> ws(std::move(stream));
// ===== 禁用压缩 =====
websocket::permessage_deflate pmd;
pmd.client_enable = false;
pmd.server_enable = false;
ws.set_option(pmd);
// ===== 设置HTTP头(与Java保持一致)=====
ws.set_option(websocket::stream_base::decorator(
[&apiKey](websocket::request_type& req) {
req.set(beast::http::field::authorization, "Bearer " + apiKey);
req.set(beast::http::field::user_agent,
"dashscope/2.17.0; java/21.0.6; platform/Windows 11; processor/amd64");
}
));
// ===== WebSocket握手 =====
ws.handshake(host, target);
std::cout << "✅ 成功连接到 Qwen 实时服务\n\n";
beast::flat_buffer buffer;
ws.read(buffer);
std::string message = beast::buffers_to_string(buffer.data());
auto data = json::parse(message);
std::string type = data["type"];
std::cout << "create:" << type << std::endl;
// ===== 发送 session.update =====
json sessionUpdate = {
{"event_id", generate_event_id()},
{"type", "session.update"},
{"session", {
{"modalities", {"audio", "text"}}, // 注意:顺序与Java一致
{"voice", "Cherry"},
{"input_audio_format", "pcm16"},
{"output_audio_format", "pcm24"},
{"instructions", "你是私人AI高级助手,请准确且友好地解答用户关于天气、网上实时信息、日期、旅游线路规划。请始终以专业和乐于助人的态度回应。"},
{"turn_detection", nullptr}, // 重要:使用nullptr而不是"null"
{"input_audio_transcription", {
{"model", "gummy-realtime-v1"}
}}
}}
};
std::string event_id = generate_event_id();
std::string updateJson =
R"({"event_id":")" + event_id
+ R"(","type":"session.update","session":{"voice":"Cherry","input_audio_format":"pcm16","instructions":"你是私人AI高级助手,请准确且友好地解答用户关于天气、网上实时信息、日期、旅游线路规划。请始终以专业和乐于助人的态度回应。","modalities":["audio","text"],"output_audio_format":"pcm24","input_audio_transcription":{"model":"gummy-realtime-v1"},"turn_detection":null}})";
ws.write(net::buffer(updateJson));
//ws.write(net::buffer(sessionUpdate.dump()));
std::cout << "📤 已发送 session.update\n";
// ===== 等待 session.created 和 session.updated =====
std::cout << "⏳ 等待会话建立...\n";
for (int i = 0; i < 1; ++i) {
beast::flat_buffer buffer;
ws.read(buffer);
std::string message = beast::buffers_to_string(buffer.data());
auto data = json::parse(message);
std::string type = data["type"];
if (type == "session.created") {
std::cout << "✅ Session 创建成功! ID: "
<< data["session"]["id"] << "\n";
} else if (type == "session.updated") {
std::cout << "✅ Session 更新成功!\n";
} else {
std::cout << "📥 收到其他事件: " << type << "\n";
}
}
// ===== 读取并发送音频数据 =====
std::cout << "📂 正在读取音频文件: " << input_audio_file << " ...\n";
auto pcm_data = read_binary_file(input_audio_file);
if (pcm_data.empty()) {
throw std::runtime_error("音频文件为空");
}
std::cout << "✅ 成功读取 " << pcm_data.size() << " 字节 PCM 数据\n\n";
std::cout << "📤 正在发送音频数据...\n";
const size_t CHUNK_SIZE = 3200;
for (size_t i = 0; i < pcm_data.size(); i += CHUNK_SIZE) {
size_t chunk_size = std::min(CHUNK_SIZE, pcm_data.size() - i);
// 使用自定义Base64编码
std::string audio_b64 = base64_encode(pcm_data.data() + i, chunk_size);
json audioAppend = {
{"event_id", generate_event_id()},
{"type", "input_audio_buffer.append"},
{"audio", audio_b64}
};
ws.write(net::buffer(audioAppend.dump()));
std::cout << "📦 发送音频块 " << (i / CHUNK_SIZE + 1)
<< " (" << chunk_size << " 字节)" << std::endl;
// 控制发送速率
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
// ===== 提交音频数据 =====
json commitMsg = {
{"event_id", generate_event_id()},
{"type", "input_audio_buffer.commit"}
};
std::string commitStr = R"({"event_id":"event_13027ba5d901489995895de8a100d88c","type":"input_audio_buffer.commit"})";
//ws.write(net::buffer(commitMsg.dump()));
ws.write(net::buffer(commitStr));
std::cout << "✅ 音频数据提交完成\n";
// beast::flat_buffer commit_buffer;
// ws.read(commit_buffer);
// ===== 发送 response.create 请求 =====
json responseCreate = {
{"event_id", generate_event_id()},
{"type", "response.create"}
};
ws.write(net::buffer(responseCreate.dump()));
std::cout << "🚀 已发送 response.create 请求\n";
// // ===== 等待音频提交确认 =====
// std::cout << "⏳ 等待音频提交确认...\n";
// beast::flat_buffer commit_buffer;
// ws.read(commit_buffer);
// std::string commit_msg = beast::buffers_to_string(commit_buffer.data());
// auto commit_data = json::parse(commit_msg);
//
// if (commit_data["type"] == "input_audio_buffer.committed") {
// std::cout << "✅ 音频提交已确认\n";
// } else {
// std::cout << "⚠️ 收到非确认消息: " << commit_data["type"] << "\n";
// }
std::cout << "⏳ 等待音频提交确认...\n";
bool committed = false;
auto wait_start = std::chrono::steady_clock::now();
while (!committed &&
std::chrono::steady_clock::now() - wait_start < std::chrono::seconds(10)) {
beast::flat_buffer commit_buffer;
ws.read(commit_buffer);
std::string commit_msg = beast::buffers_to_string(commit_buffer.data());
auto commit_data = json::parse(commit_msg);
std::string type = commit_data["type"];
std::cout << "📥 收到事件(等待 committed): " << type << "\n";
if (type == "input_audio_buffer.committed") {
std::cout << "✅ 音频提交已确认\n";
committed = true;
// 【关键】在确认提交后,再发送response.create
json responseCreate = {
{"event_id", generate_event_id()},
{"type", "response.create"}
};
ws.write(net::buffer(responseCreate.dump()));
std::cout << "🚀 已发送 response.create 请求\n";
} else if (type == "error") {
std::cerr << "❌ 提交期间发生错误: " << commit_data["error"]["message"] << "\n";
throw std::runtime_error("提交失败");
}
}
if (!committed) {
throw std::runtime_error("超时:未收到 input_audio_buffer.committed");
}
// ===== 接收和处理响应 =====
std::cout << "🎧 正在等待AI响应...\n";
process_responses(ws, output_audio_file);
// ===== 关闭连接 =====
if (ws.is_open()) {
ws.close(websocket::close_code::normal);
std::cout << "👋 已正常关闭连接\n";
}
std::cout << "🎉 测试完成!\n";
break; // 成功退出重试循环
} catch (const std::exception& e) {
std::cerr << "\n💥 异常: " << e.what() << std::endl;
retry_count++;
if (retry_count < MAX_RETRIES) {
std::cout << "🔄 第 " << retry_count << " 次重试...\n";
std::this_thread::sleep_for(std::chrono::seconds(3));
} else {
std::cerr << "❌ 已达到最大重试次数,程序终止\n";
return 1;
}
}
}
return 0;
}
📤 正在发送音频数据...
✅ 音频数据提交完成
🚀 已发送 response.create 请求
⏳ 等待音频提交确认...
💥 异常: The WebSocket stream was gracefully closed at both endpoints [boost.beast.websocket:1 at E:\tools\boost_1_89_0/boost/beast/websocket/impl/stream.hpp:377 in function 'do_fail']
❌ 已达到最大重试次数,程序终止