你好 , 遇到 cppresdk 多线程 异步任务 有时正常 有时崩溃 求付费咨询 debug 微信 18224449153
struct api::impl
{
public:
const std::string m_url_base;
const std::string m_proxy;
const std::string m_pk;
const std::string m_sk;
const std::size_t m_timeout; //recvWindow
web::http::client::http_client client;
std::mutex _mutex; // 1)post 2)保存 time_offset 3)异步回调时保护 msg_sent
std::int64_t time_offset;
std::string msg_sent;
impl(
std::string url_base
,std::string proxy
,std::string pk
,std::string sk
,std::size_t timeout
)
:
m_url_base{url_base}
,m_proxy{proxy}
,m_pk{pk}
,m_sk{sk}
,m_timeout{timeout}
,client(U(m_url_base))
{
time_offset = 0;
web::http::client::http_client_config config;
config.set_timeout(std::chrono::seconds(10));
if (m_proxy.empty() == false) { config.set_proxy(web::http::uri(U(m_proxy))); }
client = web::http::client::http_client(U(m_url_base), config);
}
template<typename T>
struct function_traits;
template<typename R, typename... Args>
struct function_traits<R(*)(Args...)>
{
using return_type = R;
using args_tuple = std::tuple<Args...>;
};
template<typename R, typename... Args>
struct function_traits<R(Args...)>
{
using return_type = R;
using args_tuple = std::tuple<Args...>;
};
template<typename T>
struct function_traits : function_traits<decltype(&T::operator())> {};
template<typename R, typename C, typename... Args>
struct function_traits<R(C::*)(Args...) const>
{
using return_type = R;
using args_tuple = std::tuple<Args...>;
};
template<typename T>
using base_type = typename std::decay<typename std::remove_reference<T>::type>::type;
template<typename CB, typename R = base_type<typename std::tuple_element<3, typename function_traits<CB>::args_tuple>::type>>
api::result<R>
post(bool _signed, const char * target, std::string action, std::map<std::string,std::string> map, CB cb)
{
pplx::task<web::http::http_response> request_task;
try
{
std::lock_guard<std::mutex> lock(_mutex); //锁 time_offset 、 msg_sent etc.
static_assert(std::tuple_size<typename function_traits<CB>::args_tuple>::value == 5, "callback must have at least 5 arguments!");
//拼接
std::string starget = target;
std::string data;
for ( const auto &it: map )
{
if ( !data.empty() ) {data += "&";}
data += it.first;
data += "=";
data += it.second;
}
//签名
if ( _signed && (std::string(target) != "/api/v3/userDataStream") )
{
assert(!m_pk.empty() && !m_sk.empty());
if ( !data.empty() ) {data += "&";}
data += "timestamp=";
data += std::to_string( get_current_ms_epoch() - time_offset );
data += "&recvWindow=";
data += std::to_string(m_timeout);
std::string signature = hmac_sha256( m_sk.c_str(), m_sk.length(), data.c_str(), data.length() );
data += "&signature=";
data += signature;
}
if ((action == web::http::methods::GET || action == web::http::methods::DEL) && !data.empty())
{
starget += "?";
starget += data;
data.clear();
}
//std::cerr << __MESSAGE("starget: " + starget + ", data:" + data) << std::endl;
msg_sent = "starget: " + starget + ", data: " + data;
web::http::http_request req(action);
req.set_request_uri(starget);
if (! (action == web::http::methods::GET || action == web::http::methods::DEL) ) { req.set_body(data, "application/x-www-form-urlencoded"); }
if ( _signed ) { req.headers().add(U("X-MBX-APIKEY"), U(m_pk)); }
request_task = client.request(req);
}
catch (const std::exception& e)
{
std::cerr << __MESSAGE("发起 req 错误: " + std::string(e.what())) << std::endl;
assert(false);
}
//----------------同步----------------------------------------------------
if ( !cb )
{
api::result<R> res{}; // 用于 catch
try
{
request_task.then(
[cb,&res,this](web::http::http_response response)
{
int statusCode = response.status_code();
if (statusCode != web::http::status_codes::OK)
{
res.ec = statusCode;
res.errmsg = std::string("[错误][2] Http状态码 status_code");
res.reply = std::to_string(statusCode);
std::string responseBody = response.extract_string().get();
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(responseBody + ", msg:" + res.errmsg + ", ec: " + std::to_string(res.ec) + ", msg_sent: " + msg_sent) << std::endl;
}
}
std::string responseBody;
try
{
responseBody = response.extract_string().get();
}
catch(const std::exception& e)
{
res.errmsg = std::string("[错误][3] extract_string().get()");
res.reply = responseBody;
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE( res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent ) << std::endl;
}
}
// std::cerr << "Response String: " << responseBody << std::endl;
if (responseBody.empty())
{
res.errmsg = std::string("[错误][4] 空字符串");
res.reply = responseBody;
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
}
else if (is_html(responseBody.c_str())) // 即便得到 response 还是 可能异常 html 404
{
res.errmsg = std::string("[错误][5] HTML 404");
res.reply = responseBody;
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
}
else
{
const flatjson::fjson json{responseBody.c_str(), responseBody.length()};
if (json.is_empty())
{
res.errmsg = std::string("[错误][6] null json");
res.reply = responseBody;
//std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
/*
PUT userDataStream 返回 空 正常
[错误][6] null json, response:{}, msg_sent: starget: /api/v3/userDataStream, data: listenKey=t7Ehn4GpnIIDsC4CN7oCvbfnxCxtzN67lFoNzQ9mKBZo2HGeVR0VR3CwLaIc
*/
}
else if ( json.error() != flatjson::FJ_EC_OK )
{
res.ec = json.error();
res.errmsg = std::string("[错误][7] json 解析失败");
res.reply = std::string(" response: ") + responseBody + ", ec: " + std::to_string(res.ec) + ", error_string: " + json.error_string();
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
}
else if ( json.is_object() && binapi::rest::is_api_error(json) )
{
auto error = binapi::rest::construct_error(json);
res.ec = error.first;
res.errmsg = std::string("[错误][8] api 状态码");
res.reply = responseBody;//{"code":" ","msg":" "}
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
}
else
{
try
{
res.v = R::construct(json);
}
catch (const std::exception& e)
{
res.errmsg = std::string("[错误][9] json construct");
res.reply = std::string("response: ") + responseBody + ", e.what(): " + e.what();
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
}
}
}
}
).wait();
}
catch (const std::exception& e)
{
res.errmsg = std::string("[错误][1] 绑定回调");
res.reply = e.what();
{
std::lock_guard<std::mutex> lock(_mutex); //锁 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
//api_usdt.cpp(317): [错误][1] 绑定回调, response:Error in SSL handshake, msg_sent: starget: /fapi/v1/klines?endTime=1746021448706&interval=1m&limit=40&symbol=ARPAUSDT, data:
}
}
return res;
}
//----------------异步----------------------------------------------------
else
{
/*
异常
1、异常 .request()
2、Http状态码 status_code != 200
3、无法提取字符串 extract_string().get()
4、空 字符串 responseBody.empty()
5、HTML 404 is_html(responseBody.c_str())
6、空 json json.is_empty()
7、json 解析失败 json.error() != 0
8、json api error (API 状态码, 例如 {"code":-1021,"msg": " ")
9、异常 json construct 及回调函数
10、回调函数
*/
try
{
request_task.then(
//&cb ---> [报错][栈溢出] stack smashing detected
[cb, map1 = std::move(map),this](web::http::http_response response)
{
api::result<R> res{};
assert(cb != nullptr);
int statusCode = response.status_code();
if (statusCode != web::http::status_codes::OK)
{
std::string responseBody = response.extract_string().get();
res.ec = statusCode;
res.errmsg = std::string("[错误][2] Http状态码 status_code");
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(responseBody + ", msg:"+res.errmsg + ", ec:" + std::to_string(res.ec) + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 2, res.errmsg, res.v, map1);//撤单报错 ec=400, emsg={"code":-2011,"msg":"Unknown order sent."}
return res;
}
else
{
std::string responseBody;
try
{
responseBody = response.extract_string().get();
}
catch(const std::exception& e)
{
res.errmsg = std::string("[错误][3] extract_string().get()");
res.reply = std::string(e.what());
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 3, res.errmsg, res.v, map1);
return res;
}
// std::cerr << "Response String: " << responseBody << std::endl;
if (responseBody.empty())
{
res.errmsg = std::string("[错误][4] 空字符串");
res.reply = responseBody;
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 4, res.errmsg, res.v, map1);
return res;
}
else if (is_html(responseBody.c_str())) // 即便得到 response 还是 可能异常 html 404
{
res.errmsg = std::string("[错误][5] HTML 404");
res.reply = responseBody;
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 5, res.errmsg, res.v, map1);
return res;
}
else
{
const flatjson::fjson json{responseBody.c_str(), responseBody.length()};
if (json.is_empty())
{
res.errmsg = std::string("[错误][6] null json");
res.reply = responseBody;
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 6, res.errmsg, res.v, map1);
return res;
}
else if ( json.error() != flatjson::FJ_EC_OK )
{
res.ec = json.error();
res.errmsg = std::string("[错误][7] json解析失败");
res.reply = std::string(" response: ") + responseBody + ", ec: " + std::to_string(res.ec) + ", error_string: " + json.error_string();
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 7, res.errmsg, res.v, map1);
return res;
}
else if ( json.is_object() && binapi::rest::is_api_error(json) )
{
auto error = binapi::rest::construct_error(json);
res.ec = error.first;
if (error.first != 200)
{
res.errmsg = std::string("[错误][8] api 状态码");
res.reply = responseBody; //{"code":" ","msg":" "}
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE(res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent) << std::endl;
}
cb(__MAKE_FILELINE, 8, res.errmsg, res.v, map1);
return res;
}
}
try
{
res.v = R::construct(json);
res.errmsg = "";
cb(__MAKE_FILELINE, 0, res.errmsg, res.v, map1);// 无错 res.ec == 0
return res;
}
catch (const std::exception& e)
{
res.errmsg = std::string("[错误][9] json construct");
res.reply = std::string("response: ") + responseBody + ", e.what():" + std::string(e.what());
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE( res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent ) << std::endl;
}
cb(__MAKE_FILELINE,9, res.errmsg, res.v, map1);
return res;
}
}
}
}//
//).wait();
);
// ).then([](pplx::task<void> t)
// // 无论第一个 .then() 是否抛出异常,都会进入第二个 .then()
// {
// try
// {
// t.get(); // 显式捕获异常
// }
// catch (const std::exception& e)
// {
// std::cerr << __MESSAGE("Async error: " + std::string(e.what())) << std::endl;
// }
// });
}
catch (const std::exception& e)
{
api::result<R> res{};
res.errmsg = std::string("[错误][1] 绑定回调函数 错误");
res.reply = e.what();
{
std::lock_guard<std::mutex> lock(_mutex); //保护 msg_sent
std::cerr << __MESSAGE( res.errmsg + ", response:" + res.reply + ", msg_sent: " + msg_sent ) << std::endl;
}
cb( __MAKE_FILELINE, 1, res.errmsg, res.v, map);
return res;
}
return api::result<R>{};
}
}; // post
};//api::impl