Jeff_12345678 2025-05-01 13:33 采纳率: 0%
浏览 4

cpprest 多线程 异步任务 偶尔崩溃 debug

你好 , 遇到 cppresdk 多线程 异步任务 有时正常 有时崩溃 求付费咨询 debug 微信 18224449153

struct api::impl
{

public:

const std::string m_url_base;
const std::string m_proxy;
const std::string m_pk;
const std::string m_sk;
const std::size_t m_timeout; //recvWindow
web::http::client::http_client client;

std::mutex        _mutex;        // 1)post 2)保存 time_offset 3)异步回调时保护 msg_sent

std::int64_t      time_offset;
std::string       msg_sent;

impl(
    std::string url_base
    ,std::string proxy
    ,std::string pk
    ,std::string sk
    ,std::size_t timeout
)
    :
    m_url_base{url_base}
    ,m_proxy{proxy}
    ,m_pk{pk}
    ,m_sk{sk}
    ,m_timeout{timeout}
    ,client(U(m_url_base))
{
    time_offset = 0;
    web::http::client::http_client_config config;
    config.set_timeout(std::chrono::seconds(10)); 
    if (m_proxy.empty() == false) { config.set_proxy(web::http::uri(U(m_proxy))); }
    client = web::http::client::http_client(U(m_url_base), config);
}

template<typename T>
struct function_traits;

template<typename R, typename... Args>
struct function_traits<R(*)(Args...)> 
{
    using return_type = R;
    using args_tuple = std::tuple<Args...>;
};

template<typename R, typename... Args>
struct function_traits<R(Args...)> 
{
    using return_type = R;
    using args_tuple = std::tuple<Args...>;
};

template<typename T>
struct function_traits : function_traits<decltype(&T::operator())> {};

template<typename R, typename C, typename... Args>
struct function_traits<R(C::*)(Args...) const> 
{
    using return_type = R;
    using args_tuple = std::tuple<Args...>;
};

template<typename T>
using base_type = typename std::decay<typename std::remove_reference<T>::type>::type;
template<typename CB, typename R = base_type<typename std::tuple_element<3, typename function_traits<CB>::args_tuple>::type>>
api::result<R>
post(bool _signed, const char * target, std::string action,  std::map<std::string,std::string> map,  CB cb) 
{
    pplx::task<web::http::http_response> request_task;
    try
    {
        std::lock_guard<std::mutex> lock(_mutex); //锁  time_offset 、 msg_sent etc.
        static_assert(std::tuple_size<typename function_traits<CB>::args_tuple>::value == 5, "callback     must have at least 5 arguments!");
        //拼接
        std::string starget = target;
        std::string data;
        for ( const auto &it: map ) 
        {
            if ( !data.empty() ) {data += "&";}
            data += it.first;
            data += "=";
            data += it.second;
        }
        //签名
        if ( _signed &&  (std::string(target) != "/api/v3/userDataStream") ) 
        {
            assert(!m_pk.empty() && !m_sk.empty());
            if ( !data.empty() ) {data += "&";}
            data += "timestamp=";
            data += std::to_string(  get_current_ms_epoch() - time_offset );
            data += "&recvWindow=";
            data += std::to_string(m_timeout);
            std::string signature = hmac_sha256( m_sk.c_str(), m_sk.length(), data.c_str(), data.length() );
            data += "&signature=";
            data += signature;
        }

        if ((action == web::http::methods::GET || action == web::http::methods::DEL) && !data.empty())
        {
            starget += "?";
            starget += data;
            data.clear();
        }
        //std::cerr << __MESSAGE("starget: " + starget + ", data:" + data) << std::endl;
        msg_sent = "starget: " + starget + ", data: " + data;
        web::http::http_request req(action);
        req.set_request_uri(starget);
        if (! (action == web::http::methods::GET || action == web::http::methods::DEL) )  {   req.set_body(data, "application/x-www-form-urlencoded"); }
        if ( _signed ) { req.headers().add(U("X-MBX-APIKEY"), U(m_pk)); }   
        request_task = client.request(req);

    }
    catch (const std::exception& e) 
    {
        std::cerr << __MESSAGE("发起 req 错误: " + std::string(e.what())) << std::endl;
        assert(false);
    }

    //----------------同步----------------------------------------------------

    if ( !cb ) 
    {
        api::result<R> res{}; // 用于 catch
        try 
        {
            request_task.then(

                [cb,&res,this](web::http::http_response response) 
                {
                    int statusCode = response.status_code();
                    if (statusCode != web::http::status_codes::OK) 
                    {
                        res.ec = statusCode;
                        res.errmsg = std::string("[错误][2] Http状态码 status_code");
                        res.reply = std::to_string(statusCode);
                        std::string responseBody = response.extract_string().get(); 
                        {
                            std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                            std::cerr << __MESSAGE(responseBody + ", msg:" +  res.errmsg + ", ec: " + std::to_string(res.ec) + ", msg_sent: " + msg_sent) << std::endl;
                        }
                    }
                    std::string responseBody;
                    try
                    {
                        responseBody = response.extract_string().get(); 
                    }
                    catch(const std::exception& e)
                    {
                        res.errmsg = std::string("[错误][3] extract_string().get()");
                        res.reply = responseBody;
                        {
                            std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                            std::cerr << __MESSAGE( res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent ) << std::endl;
                        }
                    }
                    // std::cerr << "Response String: " << responseBody << std::endl;
                    if (responseBody.empty())
                    {
                        res.errmsg = std::string("[错误][4] 空字符串");
                        res.reply = responseBody;
                        {
                            std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                            std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                        }
                    }
                    else if (is_html(responseBody.c_str())) // 即便得到 response 还是 可能异常 html 404
                    {
                        res.errmsg = std::string("[错误][5] HTML 404");
                        res.reply = responseBody;
                        {
                            std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                            std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                        }
                    } 
                    else 
                    {
                        const flatjson::fjson json{responseBody.c_str(), responseBody.length()};
                        if (json.is_empty())
                        {
                            res.errmsg = std::string("[错误][6] null json");
                            res.reply = responseBody;
                            //std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                            /*  
                                PUT userDataStream  返回 空  正常
                                [错误][6] null json, response:{}, msg_sent: starget: /api/v3/userDataStream, data: listenKey=t7Ehn4GpnIIDsC4CN7oCvbfnxCxtzN67lFoNzQ9mKBZo2HGeVR0VR3CwLaIc
                            */
                        }
                        else if ( json.error() != flatjson::FJ_EC_OK ) 
                        {
                            res.ec = json.error();
                            res.errmsg = std::string("[错误][7] json 解析失败");
                            res.reply = std::string(" response: ") + responseBody + ", ec: " + std::to_string(res.ec) + ", error_string: " + json.error_string();
                            {
                                std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                                std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                            }
                        }
                        else if ( json.is_object() && binapi::rest::is_api_error(json) ) 
                        {
                            auto error = binapi::rest::construct_error(json); 
                            res.ec = error.first; 
                            res.errmsg = std::string("[错误][8] api 状态码");
                            res.reply = responseBody;//{"code":" ","msg":" "}
                            {
                                std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                                std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                            }
                        } 
                        else 
                        {
                            try 
                            {
                                res.v = R::construct(json);
                            }
                            catch (const std::exception& e) 
                            {
                                res.errmsg = std::string("[错误][9] json construct");
                                res.reply = std::string("response: ") + responseBody + ", e.what(): " + e.what();
                                {
                                    std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                                    std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                                }
                            }
                        }
                    }   
           
                }
            ).wait(); 
        } 
        catch (const std::exception& e) 
        {
            res.errmsg = std::string("[错误][1] 绑定回调");
            res.reply = e.what();
            {
                std::lock_guard<std::mutex> lock(_mutex); //锁  msg_sent 
                std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                //api_usdt.cpp(317): [错误][1] 绑定回调, response:Error in SSL handshake, msg_sent: starget: /fapi/v1/klines?endTime=1746021448706&interval=1m&limit=40&symbol=ARPAUSDT, data: 
            }
        }
        return res;
    } 
    //----------------异步----------------------------------------------------
    else 
    {
        /*
            异常
            1、异常            .request()
            2、Http状态码       status_code != 200 
            3、无法提取字符串    extract_string().get() 
            4、空 字符串        responseBody.empty()
            5、HTML 404        is_html(responseBody.c_str())
            6、空 json          json.is_empty()
            7、json 解析失败     json.error() != 0
            8、json api error   (API 状态码, 例如  {"code":-1021,"msg": "  ")
            9、异常 json construct 及回调函数
            10、回调函数          
        */

        try 
        {
            request_task.then(
                //&cb ---> [报错][栈溢出] stack smashing detected
                [cb, map1 = std::move(map),this](web::http::http_response response) 
                {
                    api::result<R> res{};

                    assert(cb != nullptr);
                    int statusCode = response.status_code();
                    if (statusCode != web::http::status_codes::OK) 
                    {
                        std::string responseBody = response.extract_string().get(); 
                        res.ec = statusCode;
                        res.errmsg = std::string("[错误][2] Http状态码 status_code");
                        {
                            std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                            std::cerr << __MESSAGE(responseBody + ", msg:"+res.errmsg + ", ec:" + std::to_string(res.ec) + ", msg_sent: " + msg_sent) << std::endl;
                        }
                        cb(__MAKE_FILELINE, 2, res.errmsg, res.v, map1);//撤单报错 ec=400, emsg={"code":-2011,"msg":"Unknown order sent."}
                        return res;
                    }
                    else
                    {
                        std::string responseBody;
                        try
                        {
                            responseBody = response.extract_string().get(); 
                        }
                        catch(const std::exception& e)
                        {
                            res.errmsg = std::string("[错误][3] extract_string().get()");
                            res.reply = std::string(e.what());
                            {
                                std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                            }
                            cb(__MAKE_FILELINE, 3, res.errmsg, res.v, map1);
                            return res;
                        }
                        // std::cerr << "Response String: " << responseBody << std::endl;
                        if (responseBody.empty())
                        {
                            res.errmsg = std::string("[错误][4] 空字符串");
                            res.reply = responseBody;
                            {
                                std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                            }
                            cb(__MAKE_FILELINE, 4, res.errmsg, res.v, map1);
                            return res;
                        }
                        else if (is_html(responseBody.c_str())) // 即便得到 response 还是 可能异常 html 404
                        {
                            res.errmsg = std::string("[错误][5] HTML 404");
                            res.reply = responseBody;
                            {
                                std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                            }
                            cb(__MAKE_FILELINE, 5, res.errmsg, res.v, map1);
                            return res;
                        } 
                        else 
                        {
                            const flatjson::fjson json{responseBody.c_str(), responseBody.length()};

                            if (json.is_empty())
                            {
                                res.errmsg = std::string("[错误][6] null json");
                                res.reply = responseBody;
                                {
                                    std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                    std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                                }
                                cb(__MAKE_FILELINE, 6, res.errmsg, res.v, map1);
                                return res;
                            }
                            else if ( json.error() != flatjson::FJ_EC_OK ) 
                            {
                                res.ec = json.error();
                                res.errmsg = std::string("[错误][7] json解析失败");
                                res.reply = std::string(" response: ") + responseBody + ", ec: " + std::to_string(res.ec) + ", error_string: " + json.error_string();
                                {
                                    std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                    std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                                }
                                cb(__MAKE_FILELINE, 7, res.errmsg, res.v, map1);
                                return res;
                            }
                            else if ( json.is_object() && binapi::rest::is_api_error(json) ) 
                            {
                                auto error = binapi::rest::construct_error(json); 
                                res.ec = error.first;
                                if (error.first != 200)
                                {
                                    res.errmsg = std::string("[错误][8] api 状态码");
                                    res.reply = responseBody; //{"code":" ","msg":" "}
                                    {
                                        std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                        std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
                                    }
                                    cb(__MAKE_FILELINE, 8, res.errmsg, res.v, map1);
                                    return res;
                                }
                            } 
                            try 
                            {
                                res.v = R::construct(json);
                                res.errmsg = "";
                                cb(__MAKE_FILELINE, 0, res.errmsg, res.v, map1);//  无错 res.ec == 0
                                return res;
                            }
                            catch (const std::exception& e) 
                            {
                                res.errmsg = std::string("[错误][9] json construct");
                                res.reply = std::string("response: ") + responseBody + ", e.what():" + std::string(e.what());
                                {
                                    std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                                    std::cerr << __MESSAGE( res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent ) << std::endl;
                                }
                                cb(__MAKE_FILELINE,9, res.errmsg, res.v, map1);
                                return res;
                            }
                        } 
                    } 
                }//
            //).wait(); 
            );
            // ).then([](pplx::task<void> t) 
            // // 无论第一个 .then() 是否抛出异常,都会进入第二个 .then()
            // {
            //     try 
            //     {
            //         t.get(); // 显式捕获异常 
            //     } 
            //     catch (const std::exception& e) 
            //     {
            //         std::cerr << __MESSAGE("Async error: " + std::string(e.what())) << std::endl;
            //     }
            // });
        } 
        catch (const std::exception& e) 
        {
            api::result<R> res{};
            res.errmsg = std::string("[错误][1] 绑定回调函数 错误");
            res.reply = e.what();
            {
                std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
                std::cerr << __MESSAGE( res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent ) << std::endl;
            }
            cb( __MAKE_FILELINE, 1, res.errmsg, res.v, map);
            return res;
        }
        return api::result<R>{};
    }
}; // post

};//api::impl

  • 写回答

3条回答 默认 最新

  • 紫雾凌寒 新星创作者: 人工智能技术领域 2025-05-01 13:50
    关注

    让 “紫雾凌寒” 来帮你回答,本回答结合 DeepSeek

    如果还有问题,请评论或点击头像私信,如果有帮助到您, 请关注、采纳、点赞

    1. 问题关键点分析
      - 从提供的信息来看,关键问题在于使用cpprestsdk进行多线程异步任务时出现不稳定的崩溃情况。已知代码片段非常简短且不完整,不太能直接看出问题所在。可能的原因包括线程安全问题、资源竞争、异步任务管理不当、未正确处理错误或异常等。
    2. 解决方案
      - 代码审查
      • 首先,需要完整展示相关的多线程异步任务代码。例如,假设存在一个简单的异步HTTP请求任务在多线程中执行的场景:
    #include <cpprest/http_client.h>
    #include <cpprest/asyncrt_utils.h>
    #include <thread>
    #include <vector>
    
    using namespace web::http;
    using namespace web::http::client;
    
    void performAsyncTask() {
        http_client client(U("http://example.com"));
        pplx::task<http_response> task = client.request(methods::GET);
        task.then([](http_response response) {
            // 处理响应
            utility::string_t response_body = response.extract_string().get();
            std::cout << "Response body: " << response_body << std::endl;
        }).wait();
    }
    
    int main() {
        std::vector<std::thread> threads;
        for (int i = 0; i < 10; ++i) {
            threads.emplace_back(performAsyncTask);
        }
        for (auto& th : threads) {
            th.join();
        }
        return 0;
    }
    
    • 线程安全检查
      • 检查cpprestsdk对象在多线程中的使用是否线程安全。例如,http_client对象是否被多个线程同时访问和修改。如果不是线程安全的,需要进行适当的同步。
      • 对于上述代码,http_client对象在每个线程中独立创建和使用,相对比较安全。但如果在更复杂的场景中共享了某些cpprestsdk资源,需要特别注意。
    • 错误处理
      • 在异步任务的处理过程中添加更完善的错误处理。比如,在task.then的回调中添加错误检查:
    task.then([](http_response response) {
        if (response.status_code()!= status_codes::OK) {
            std::cerr << "Request failed with status code: " << response.status_code() << std::endl;
            return;
        }
        utility::string_t response_body = response.extract_string().get();
        std::cout << "Response body: " << response_body << std::endl;
    }).wait();
    
    • 资源管理
      • 确保异步任务正确管理资源。例如,在任务完成后及时释放相关资源。在上述简单的HTTP请求示例中,http_client对象在任务结束后会自动析构,但在更复杂的场景中可能需要手动管理资源。
        3. 多种解决方案的优缺点
    • 代码审查
      • 优点:全面检查代码逻辑,可能直接发现导致崩溃的代码错误。
      • 缺点:比较耗时,需要熟悉cpprestsdk和多线程编程,对于复杂代码库可能难以快速定位问题。
    • 线程安全检查
      • 优点:针对性强,能快速发现线程安全相关的潜在问题。
      • 缺点:依赖对cpprestsdk线程安全特性的了解,可能遗漏其他类型的问题。
    • 错误处理
      • 优点:能及时捕获和处理异步任务中的错误,避免错误积累导致崩溃。
      • 缺点:不能从根本上解决不稳定崩溃的问题,只是在出现错误时能更好地提示。
    • 资源管理
      • 优点:保证资源的正确使用和释放,防止因资源问题导致崩溃。
      • 缺点:需要对代码中使用的所有资源有清晰的了解,确保管理正确。
        4. 总结
    • 要解决cpprestsdk多线程异步任务偶尔崩溃的问题,需要从多个方面进行排查。首先要确保代码逻辑正确,然后检查线程安全、错误处理和资源管理等方面。通过逐步分析和测试,定位并解决导致崩溃的具体原因。

    需要注意的是,仅根据目前提供的少量代码片段很难完全准确地解决问题,完整的代码分析和调试是必要的。同时,关于付费咨询,在这个平台上主要是提供免费的技术交流和解答,你可以进一步详细描述问题,以便更有效地解决。

    希望以上解答对您有所帮助。如果您有任何疑问,欢迎在评论区提出。

    评论

报告相同问题?

问题事件

  • 创建了问题 5月1日