forlaolei 2025-11-12 17:08 采纳率: 0%
浏览 12
已结题

c++接入Qwen-Omni-Realtime问题

c++接入Qwen-Omni-Realtime问题

c++ 代码在发送input_audio_buffer.commit后,连接就被关闭了。音频是对的,java代码是可以跑的。

#include <iostream>
#include <string>
#include <thread>
#include <chrono>
#include <fstream>
#include <vector>
#include <random>
#include "json.hpp"

// Boost.Asio + Beast
#include <boost/asio.hpp>
#include <boost/asio/ssl.hpp>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/beast/websocket/ssl.hpp>

using json = nlohmann::json;
namespace net = boost::asio;
namespace ssl = net::ssl;
namespace beast = boost::beast;
namespace websocket = beast::websocket;
using tcp = net::ip::tcp;

// 自定义Base64实现(确保与Java兼容)
static const std::string base64_chars =
    "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
    "abcdefghijklmnopqrstuvwxyz"
    "0123456789+/";

std::string base64_encode(const uint8_t* data, size_t len) {
    std::string ret;
    int i = 0;
    int j = 0;
    uint8_t char_array_3[3];
    uint8_t char_array_4[4];

    const uint8_t* bytes_to_encode = data;
    size_t in_len = len;

    while (in_len > 0) {
        char_array_3[i++] = *(bytes_to_encode++);
        in_len--;
        if (i == 3) {
            char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
            char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
            char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
            char_array_4[3] = char_array_3[2] & 0x3f;

            for(i = 0; i < 4; i++)
                ret += base64_chars[char_array_4[i]];
            i = 0;
        }
    }

    if (i > 0) {
        for(j = i; j < 3; j++)
            char_array_3[j] = 0;

        char_array_4[0] = (char_array_3[0] & 0xfc) >> 2;
        char_array_4[1] = ((char_array_3[0] & 0x03) << 4) + ((char_array_3[1] & 0xf0) >> 4);
        char_array_4[2] = ((char_array_3[1] & 0x0f) << 2) + ((char_array_3[2] & 0xc0) >> 6);
        char_array_4[3] = char_array_3[2] & 0x3f;

        for (j = 0; j < i + 1; j++)
            ret += base64_chars[char_array_4[j]];

        while(i++ < 3)
            ret += '=';
    }

    return ret;
}

// 读取二进制文件
std::vector<uint8_t> read_binary_file(const std::string& filepath) {
    std::ifstream file(filepath, std::ios::binary);
    if (!file) {
        throw std::runtime_error("无法打开文件: " + filepath);
    }
    return std::vector<uint8_t>((std::istreambuf_iterator<char>(file)),
                               std::istreambuf_iterator<char>());
}

// 写入二进制文件
void write_binary_file(const std::string& filepath, const std::vector<uint8_t>& data) {
    std::ofstream file(filepath, std::ios::binary);
    if (!file) {
        throw std::runtime_error("无法创建文件: " + filepath);
    }
    file.write(reinterpret_cast<const char*>(data.data()), data.size());
}

// 生成事件ID
std::string generate_event_id() {
    static const std::string chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789";
    static std::random_device rd;
    static std::mt19937 gen(rd());
    static std::uniform_int_distribution<> dis(0, chars.size() - 1);

    std::string id = "event_";
    for (int i = 0; i < 16; ++i) {
        id += chars[dis(gen)];
    }
    return id;
}

// 处理WebSocket响应的函数
void process_responses(websocket::stream<ssl::stream<tcp::socket>>& ws,
                      const std::string& output_audio_file) {
    std::string full_text;
    std::vector<uint8_t> audio_buffer;
    bool response_completed = false;

    auto start_time = std::chrono::steady_clock::now();

    while (!response_completed &&
           std::chrono::steady_clock::now() - start_time < std::chrono::seconds(60)) {

        beast::flat_buffer response_buffer;
        beast::error_code ec;

        // 使用阻塞读取
        ws.read(response_buffer, ec);

        if (ec) {
            if (ec == websocket::error::closed) {
                std::cout << "ℹ️ 连接已关闭\n";
                break;
            } else {
                std::cerr << "⚠️ 读取错误: " << ec.message() << "\n";
                break;
            }
        }

        std::string response_msg = beast::buffers_to_string(response_buffer.data());
        auto response_data = json::parse(response_msg);
        std::string type = response_data.value("type", "");

        std::cout << "📥 收到事件: " << type << "\n";

        if (type == "session.created") {
            std::cout << "✅ Session 创建成功! ID: "
                      << response_data["session"]["id"] << "\n";
        } else if (type == "session.updated") {
            std::cout << "✅ Session 更新成功!\n";
        } else if (type == "input_audio_buffer.committed") {
            std::cout << "✅ 音频提交已确认\n";
        } else if (type == "conversation.item.created") {
            std::cout << "💬 对话项已创建\n";
        } else if (type == "conversation.item.input_audio_transcription.completed") {
            if (response_data.contains("transcript")) {
                std::string transcript = response_data["transcript"].get<std::string>();
                std::cout << "🎤 语音识别结果: " << transcript << "\n";
            }
        } else if (type == "response.created") {
            std::cout << "✅ 响应已创建\n";
        } else if (type == "response.text.delta") {
            if (response_data.contains("delta")) {
                std::string delta = response_data["delta"].get<std::string>();
                full_text += delta;
                std::cout << "💬 " << delta << std::flush;
            }
        } else if (type == "response.audio.delta") {
            if (response_data.contains("delta")) {
                // 注意:这里需要base64解码,但Java代码中似乎没有处理音频响应
                std::cout << "🔊 收到音频数据\n";
            }
        } else if (type == "response.audio_transcript.delta") {
            if (response_data.contains("delta")) {
                std::string delta = response_data["delta"].get<std::string>();
                full_text += delta;
                std::cout << "📄 " << delta << std::flush;
            }
        } else if (type == "response.done") {
            std::cout << "\n\n✅ AI 响应完成!\n";
            if (!full_text.empty()) {
                std::cout << "📝 完整文本回答: " << full_text << "\n";
            }
            response_completed = true;
        } else if (type == "error") {
            std::string error_msg = response_data["error"].value("message", "未知错误");
            std::cerr << "❌ 服务端错误: " << error_msg << "\n";
            if (response_data.contains("code")) {
                std::cerr << "   错误代码: " << response_data["code"] << "\n";
            }
            break;
        } else {
            std::cout << "📥 其他事件: " << response_msg << "\n";
        }
    }

    if (!response_completed) {
        std::cout << "⚠️ 响应未在超时时间内完成\n";
    }
}

int main() {
#ifdef _WIN32
    SetConsoleOutputCP(CP_UTF8);
#endif

    const int MAX_RETRIES = 1;
    int retry_count = 0;

    std::string apiKey = "";
    std::string input_audio_file = "F:/q1_16khz.pcm";
    std::string output_audio_file = "F:/response.pcm";

    while (retry_count < MAX_RETRIES) {
        try {
            std::string host = "dashscope.aliyuncs.com";
            std::string port = "443";
            std::string target = "/api-ws/v1/realtime?model=qwen3-omni-flash-realtime";

            std::cout << "🔧 配置信息:\n";
            std::cout << "   API Key: sk-******* (已隐藏部分)\n";
            std::cout << "   主机: " << host << "\n";
            std::cout << "   端口: " << port << "\n";
            std::cout << "   目标: " << target << "\n\n";

            // ===== 初始化网络上下文 =====
            net::io_context ioc;
            ssl::context ctx(ssl::context::tlsv13_client);
            ctx.set_default_verify_paths();

            // ===== 解析域名 =====
            tcp::resolver resolver(ioc);
            auto const results = resolver.resolve(host, port);
            std::cout << "🔍 正在解析域名: " << host << ":" << port << "\n";

            // ===== 创建SSL连接 =====
            ssl::stream<tcp::socket> stream(ioc, ctx);
            net::connect(stream.next_layer(), results.begin(), results.end());
            std::cout << "🔌 已建立TCP连接\n";

            // ===== 设置SNI =====
            if (!SSL_set_tlsext_host_name(stream.native_handle(), host.c_str())) {
                throw std::runtime_error("设置SNI失败");
            }

            // ===== SSL握手 =====
            stream.handshake(ssl::stream_base::client);
            std::cout << "🤝 SSL握手成功\n";

            // ===== 创建WebSocket =====
            websocket::stream<ssl::stream<tcp::socket>> ws(std::move(stream));

            // ===== 禁用压缩 =====
            websocket::permessage_deflate pmd;
            pmd.client_enable = false;
            pmd.server_enable = false;
            ws.set_option(pmd);

            // ===== 设置HTTP头(与Java保持一致)=====
            ws.set_option(websocket::stream_base::decorator(
                [&apiKey](websocket::request_type& req) {
                    req.set(beast::http::field::authorization, "Bearer " + apiKey);
                    req.set(beast::http::field::user_agent,
                           "dashscope/2.17.0; java/21.0.6; platform/Windows 11; processor/amd64");
                }
            ));

            // ===== WebSocket握手 =====
            ws.handshake(host, target);
            std::cout << "✅ 成功连接到 Qwen 实时服务\n\n";

            beast::flat_buffer buffer;
            ws.read(buffer);
            std::string message = beast::buffers_to_string(buffer.data());
            auto data = json::parse(message);

            std::string type = data["type"];
            std::cout << "create:" << type << std::endl;

            // ===== 发送 session.update =====
            json sessionUpdate = {
                {"event_id", generate_event_id()},
                {"type", "session.update"},
                {"session", {
                    {"modalities", {"audio", "text"}},  // 注意:顺序与Java一致
                    {"voice", "Cherry"},
                    {"input_audio_format", "pcm16"},
                    {"output_audio_format", "pcm24"},
                    {"instructions", "你是私人AI高级助手,请准确且友好地解答用户关于天气、网上实时信息、日期、旅游线路规划。请始终以专业和乐于助人的态度回应。"},
                    {"turn_detection", nullptr},  // 重要:使用nullptr而不是"null"
                    {"input_audio_transcription", {
                        {"model", "gummy-realtime-v1"}
                    }}
                }}
            };

            std::string event_id = generate_event_id();
            std::string updateJson =
                    R"({"event_id":")" + event_id
                            + R"(","type":"session.update","session":{"voice":"Cherry","input_audio_format":"pcm16","instructions":"你是私人AI高级助手,请准确且友好地解答用户关于天气、网上实时信息、日期、旅游线路规划。请始终以专业和乐于助人的态度回应。","modalities":["audio","text"],"output_audio_format":"pcm24","input_audio_transcription":{"model":"gummy-realtime-v1"},"turn_detection":null}})";
            ws.write(net::buffer(updateJson));

            //ws.write(net::buffer(sessionUpdate.dump()));
            std::cout << "📤 已发送 session.update\n";

            // ===== 等待 session.created 和 session.updated =====
            std::cout << "⏳ 等待会话建立...\n";
            for (int i = 0; i < 1; ++i) {
                beast::flat_buffer buffer;
                ws.read(buffer);
                std::string message = beast::buffers_to_string(buffer.data());
                auto data = json::parse(message);

                std::string type = data["type"];
                if (type == "session.created") {
                    std::cout << "✅ Session 创建成功! ID: "
                              << data["session"]["id"] << "\n";
                } else if (type == "session.updated") {
                    std::cout << "✅ Session 更新成功!\n";
                } else {
                    std::cout << "📥 收到其他事件: " << type << "\n";
                }
            }

            // ===== 读取并发送音频数据 =====
            std::cout << "📂 正在读取音频文件: " << input_audio_file << " ...\n";
            auto pcm_data = read_binary_file(input_audio_file);
            if (pcm_data.empty()) {
                throw std::runtime_error("音频文件为空");
            }
            std::cout << "✅ 成功读取 " << pcm_data.size() << " 字节 PCM 数据\n\n";

            std::cout << "📤 正在发送音频数据...\n";
            const size_t CHUNK_SIZE = 3200;

           for (size_t i = 0; i < pcm_data.size(); i += CHUNK_SIZE) {
                size_t chunk_size = std::min(CHUNK_SIZE, pcm_data.size() - i);

              // 使用自定义Base64编码
                std::string audio_b64 = base64_encode(pcm_data.data() + i, chunk_size);

                json audioAppend = {
                   {"event_id", generate_event_id()},
                    {"type", "input_audio_buffer.append"},
                   {"audio", audio_b64}
                };

               ws.write(net::buffer(audioAppend.dump()));
               std::cout << "📦 发送音频块 " << (i / CHUNK_SIZE + 1)
                       << " (" << chunk_size << " 字节)" << std::endl;

              // 控制发送速率
                std::this_thread::sleep_for(std::chrono::milliseconds(100));
            }

           

          
            // ===== 提交音频数据 =====
            json commitMsg = {
                {"event_id", generate_event_id()},
                {"type", "input_audio_buffer.commit"}
            };
            std::string commitStr = R"({"event_id":"event_13027ba5d901489995895de8a100d88c","type":"input_audio_buffer.commit"})";
            //ws.write(net::buffer(commitMsg.dump()));
            ws.write(net::buffer(commitStr));
            std::cout << "✅ 音频数据提交完成\n";
//            beast::flat_buffer commit_buffer;
//            ws.read(commit_buffer);
            // ===== 发送 response.create 请求 =====
            json responseCreate = {
                {"event_id", generate_event_id()},
                {"type", "response.create"}
            };
            ws.write(net::buffer(responseCreate.dump()));
            std::cout << "🚀 已发送 response.create 请求\n";

//            // ===== 等待音频提交确认 =====
//            std::cout << "⏳ 等待音频提交确认...\n";
//            beast::flat_buffer commit_buffer;
//            ws.read(commit_buffer);
//            std::string commit_msg = beast::buffers_to_string(commit_buffer.data());
//            auto commit_data = json::parse(commit_msg);
//
//            if (commit_data["type"] == "input_audio_buffer.committed") {
//                std::cout << "✅ 音频提交已确认\n";
//            } else {
//                std::cout << "⚠️ 收到非确认消息: " << commit_data["type"] << "\n";
//            }

            std::cout << "⏳ 等待音频提交确认...\n";
            bool committed = false;
            auto wait_start = std::chrono::steady_clock::now();

            while (!committed &&
                   std::chrono::steady_clock::now() - wait_start < std::chrono::seconds(10)) {

                beast::flat_buffer commit_buffer;
                ws.read(commit_buffer);
                std::string commit_msg = beast::buffers_to_string(commit_buffer.data());
                auto commit_data = json::parse(commit_msg);

                std::string type = commit_data["type"];
                std::cout << "📥 收到事件(等待 committed): " << type << "\n";

                if (type == "input_audio_buffer.committed") {
                    std::cout << "✅ 音频提交已确认\n";
                    committed = true;

                    // 【关键】在确认提交后,再发送response.create
                    json responseCreate = {
                        {"event_id", generate_event_id()},
                        {"type", "response.create"}
                    };
                    ws.write(net::buffer(responseCreate.dump()));
                    std::cout << "🚀 已发送 response.create 请求\n";

                } else if (type == "error") {
                    std::cerr << "❌ 提交期间发生错误: " << commit_data["error"]["message"] << "\n";
                    throw std::runtime_error("提交失败");
                }
            }

            if (!committed) {
                throw std::runtime_error("超时:未收到 input_audio_buffer.committed");
            }

            // ===== 接收和处理响应 =====
            std::cout << "🎧 正在等待AI响应...\n";
            process_responses(ws, output_audio_file);

            // ===== 关闭连接 =====
            if (ws.is_open()) {
                ws.close(websocket::close_code::normal);
                std::cout << "👋 已正常关闭连接\n";
            }

            std::cout << "🎉 测试完成!\n";
            break; // 成功退出重试循环

        } catch (const std::exception& e) {
            std::cerr << "\n💥 异常: " << e.what() << std::endl;
            retry_count++;

            if (retry_count < MAX_RETRIES) {
                std::cout << "🔄 第 " << retry_count << " 次重试...\n";
                std::this_thread::sleep_for(std::chrono::seconds(3));
            } else {
                std::cerr << "❌ 已达到最大重试次数,程序终止\n";
                return 1;
            }
        }
    }

    return 0;
}

📤 正在发送音频数据...
✅ 音频数据提交完成
🚀 已发送 response.create 请求
⏳ 等待音频提交确认...

💥 异常: The WebSocket stream was gracefully closed at both endpoints [boost.beast.websocket:1 at E:\tools\boost_1_89_0/boost/beast/websocket/impl/stream.hpp:377 in function 'do_fail']
❌ 已达到最大重试次数,程序终止



  • 写回答

5条回答 默认 最新

  • 阿里嘎多学长 2025-11-12 17:11
    关注

    阿里嘎多学长整理AIGC生成,因移动端显示问题导致当前答案未能完全显示,请使用PC端查看更加详细的解答过程

    问题分析

    • 你使用 C++ 接入 Qwen-Omni-Realtime 服务
    • 在发送 input_audio_buffer.commit 后,连接被关闭
    • Java 代码可以正常运行
    • C++ 代码包含了 #include 语句,但未完整显示

    解决方案

    1. 检查连接关闭原因 确保连接关闭不是由于其他原因引起的,例如网络问题、服务端问题等。
    2. 检查 C++ 代码 检查 C++ 代码是否正确发送 input_audio_buffer.commit,是否有任何错误或异常。
    3. 比较 Java 和 C++ 代码 比较 Java 和 C++ 代码的差异,是否有任何关键区别。
    4. 检查 Qwen-Omni-Realtime 服务文档 检查 Qwen-Omni-Realtime 服务文档,是否有任何相关信息或示例代码。

    核心代码

    // 检查连接状态
    if (connection->getState() == Connection::State::CLOSED) {
        // 重连或处理连接关闭
    }
    
    // 发送 input_audio_buffer.commit
    if (input_audio_buffer.commit()) {
        // 处理成功
    } else {
        // 处理失败
    }
    

    注意 上述代码仅为示例,具体实现需要根据 Qwen-Omni-Realtime 服务文档和你的 C++ 代码进行调整。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 12月2日
  • 创建了问题 11月12日