szy_szya 2022-11-18 02:46
浏览 111
已结题

libwebsocket,数据发不出去,lws_cancel_service无法触发write回调

问题遇到的现象和发生背景

用libwebsocket开源库做arm下ws客户端的开发,刚开始运行时收发都是正常的,大概过一段时间(不确定)后,数据的RECEIVE依然是正常的,可以收到服务器发来的数据,但是无法WRITE给服务器,卡住了,调查后发现lws_cancel_service调用LWS_CALLBACK_EVENT_WAIT_CANCELLED的cmd后,程序没有进入到LWS_CALLBACK_CLIENT_WRITEABLE中,求解!ws回调代码已贴出

用代码块功能插入代码,请勿粘贴截图
static int WS_Service_Callback(struct lws *wsi, enum lws_callback_reasons reason,void *user, void *in, size_t len)
{
    Per_Vhost_Data_Minimal_t *vhd=(Per_Vhost_Data_Minimal_t *)lws_protocol_vh_priv_get(lws_get_vhost(wsi),lws_get_protocol(wsi));
    switch(reason)
    {
        case LWS_CALLBACK_CLIENT_CONNECTION_ERROR:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_CLIENT_CONNECTION_ERROR",reason);
            lwsl_err("CLIENT CONNECTION ERROR:%s\n",in?(char *)in:"(null)");
            vhd->Client_Wsi=NULL;
            lws_timed_callback_vh_protocol(vhd->Vhost,vhd->Protocol,LWS_CALLBACK_USER,1);
        }
        break;
        case LWS_CALLBACK_PROTOCOL_INIT:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_PROTOCOL_INIT",reason);
            NMS_DEBUG_INFO("INIT user=%p ",lws_context_user(lws_get_context(wsi)));
            vhd=lws_protocol_vh_priv_zalloc(lws_get_vhost(wsi),lws_get_protocol(wsi),sizeof(Per_Vhost_Data_Minimal_t));
            vhd->Context=lws_get_context(wsi);
            vhd->Protocol=lws_get_protocol(wsi);
            vhd->Vhost=lws_get_vhost(wsi);
            vhd->PrivateData=lws_context_user(lws_get_context(wsi));
            pthread_mutex_init(&vhd->Lock_Ring,NULL);
            pthread_cond_init(&vhd->Cond_Ring,NULL);
            vhd->Ring = lws_ring_create(sizeof(MessageComm_t),32,Minimal_Free_MessageComm);
            if(NMS_Linker_Client_Connect(vhd))
            {
                printf("PROTOCOL INIT server connect fail \n");
                lws_timed_callback_vh_protocol(vhd->Vhost,vhd->Protocol,LWS_CALLBACK_USER,1);
            }
        }
        break;
        case LWS_CALLBACK_CLIENT_ESTABLISHED:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_CLIENT_ESTABLISHED",reason);            
            // NMS_DEBUG_INFO("user =%p",user);
            // NMS_DEBUG_INFO("wsi user=%p",lws_wsi_user(wsi));
            vhd->Established=1;
            NMS_Linker_Operation_Handle_t *linkerHandle=(NMS_Linker_Operation_Handle_t *)vhd->PrivateData;            
            vhd->Nms_Comm=NmsCommConnector_Open(&linkerHandle->Local_Cfg,&linkerHandle->Remote_Cfg.Token);
            pthread_create(&vhd->Th_HeartBeat,NULL,NMS_Linker_HeartBeat_Task,vhd); 
        }
        break;
        case LWS_CALLBACK_CLIENT_WRITEABLE:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_CLIENT_WRITEABLE",reason);
            time_t tnow=time(NULL);
            struct tm *tmNow=localtime(&tnow);
            char timeStr[32]={0};
            strftime(timeStr,32,"%c",tmNow);
            pthread_mutex_lock(&vhd->Lock_Ring);
            const MessageComm_t *pmsg=lws_ring_get_element(vhd->Ring,&vhd->Tail);
            int w_count=lws_write(wsi,pmsg->Payload+LWS_PRE,pmsg->Len,LWS_WRITE_TEXT);
            NMS_DEBUG_INFO("[Time=%s] write_count=%d msg_len=%d",timeStr,w_count,pmsg->Len);
            lws_ring_consume_single_tail(vhd->Ring,&vhd->Tail,1);
            if(lws_ring_get_element(vhd->Ring,&vhd->Tail))
            {
                lws_callback_on_writable(wsi);
            }
            pthread_mutex_unlock(&vhd->Lock_Ring);
        }
        break;
        case LWS_CALLBACK_CLIENT_RECEIVE:
        {
            // NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_CLIENT_RECEIVE",reason);
            //NMS_DEBUG_INFO("in=%s",(const char *)in);
            cJSON *receiveJ=cJSON_Parse((const char *)in);
            cJSON *cmdJ=cJSON_GetObjectItem(receiveJ,"cmd");
            char cmdString[64]={0};
            strcpy(cmdString,cmdJ->valuestring);
            cJSON_Delete(receiveJ);
            char *pModeString=NULL;
            NMS_AES_Decrypt_Message(cmdString,&pModeString);
            Nms_Cmd_Mode_t cMode;
            NmsCommConnector_Message_Direction(pModeString,&cMode);
            free(pModeString);
            pModeString=NULL;
            NMS_DEBUG_INFO("CMD:[%s_%d_%s]",cMode.DevModeToNms,cMode.Direction,cMode.CmdMode);
            if(0==strcmp(cMode.CmdMode,NMS_CMD_MODE_HEARTBEAT))
            {
                NMS_DEBUG_INFO("NMS_CMD_MODE_HEARTBEAT ");
                NmsCommConnector_HeartBeat_Receive(vhd->Nms_Comm,(const char *)in);
            }
            else if(0==strcmp(cMode.CmdMode,NMS_CMD_MODE_MESSAGE))
            {
                NMS_DEBUG_INFO("NMS_CMD_MODE_MESSAGE");
                NmsCommConnector_Request(vhd->Nms_Comm,(const char *)in);
                NmsCommConnector_Run(vhd->Nms_Comm);
                const char *pRespString=NULL;
                NmsCommConnector_Respone(vhd->Nms_Comm,&pRespString);
                //NMS_DEBUG_INFO("[Respone WS]%s\n",pRespString);
                pthread_mutex_lock(&vhd->Lock_Ring);
                int ring_free_count=lws_ring_get_count_free_elements(vhd->Ring);
                if(!ring_free_count)
                {
                    NMS_DEBUG_INFO(" [!ring_free_count]");
                    pthread_mutex_unlock(&vhd->Lock_Ring);
                }
                else
                {
                    int n;
                    MessageComm_t msg;
                    int respLength=strlen(pRespString)+1;
                    //NMS_DEBUG_INFO("WS Comm Message LEN=%d",respLength);
                    msg.Payload=malloc((unsigned int )(LWS_PRE+respLength));      
                    n=lws_snprintf((char *)msg.Payload+LWS_PRE,(unsigned int)respLength,pRespString);
                    msg.Len = (unsigned int)n;
                    if(!lws_ring_insert(vhd->Ring,&msg,1))
                    {
                        Minimal_Free_MessageComm(&msg);
                    }
                    lws_cancel_service(vhd->Context);
                    pthread_mutex_unlock(&vhd->Lock_Ring);
                    NMS_DEBUG_INFO(" [ring_free_count] success ");
                }
            }
            else
            {
                NMS_DEBUG_ERR("THE RECEIVE MESSAGE IS UNKNOWN!!!!!! [Cmd=%s]",cmdString);
            }
        }
        break;
        case LWS_CALLBACK_CLIENT_CLOSED:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_CLIENT_CLOSED",reason);
            vhd->Client_Wsi=NULL;
            vhd->Established=0;
            //heart beat task free
            vhd->IsStop_HeartBeat=1;
            pthread_cond_signal(&vhd->Cond_Ring);
            NmsCommConnector_Close(vhd->Nms_Comm);
            pthread_join(vhd->Th_HeartBeat,NULL);
            int closed_return=lws_timed_callback_vh_protocol(vhd->Vhost,vhd->Protocol,LWS_CALLBACK_USER,1);
            // NMS_DEBUG_INFO("[closed_return=%2d] NMS-DEBUG-PROBLEM-1",closed_return);
        }
        break;
        case LWS_CALLBACK_EVENT_WAIT_CANCELLED:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_EVENT_WAIT_CANCELLED",reason);
            if (vhd &&vhd->Client_Wsi&& vhd->Established)
            {
                lws_callback_on_writable(vhd->Client_Wsi);
            }
        }
        break;
        case LWS_CALLBACK_PROTOCOL_DESTROY:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_PROTOCOL_DESTROY",reason);
            if(vhd->Ring)
            {
                lws_ring_destroy(vhd->Ring);
            }        
            pthread_mutex_destroy(&vhd->Lock_Ring);
            pthread_cond_destroy(&vhd->Cond_Ring);
        }
        break;
        case LWS_CALLBACK_USER:
        {
            NMS_DEBUG_INFO("[reason=%2d] LWS_CALLBACK_USER",reason);
            if (NMS_Linker_Client_Connect(vhd))
            {
                lws_timed_callback_vh_protocol(vhd->Vhost,vhd->Protocol,LWS_CALLBACK_USER, 1);
            }
        }
        break;
        default:
        {
            NMS_DEBUG_INFO("[reason=%2d] UNCALLER",reason);
            NMS_DEBUG_INFO("wsi user=%p",lws_wsi_user(wsi));
            
        }
        break;
    }
    return lws_callback_http_dummy(wsi, reason, user, in, len);
}
运行结果及报错内容

img

我的解答思路和尝试过的方法

调试打印后,发现异常是lws_callback_on_writable突然无法触发LWS_CALLBACK_CLIENT_WRITEABLE了,之后就不能发送了,只能收,ring队列长度我设置为32,上述情况发生后,队列长度会一直到30,然后在get_count_free_ring中判断,队列空闲小于2时,就不会插入了,由此判断,问题发生在发送。

我想要达到的结果

数据可以正常发送,不会卡住,

展开全部

  • 写回答

0条回答 默认 最新

      编辑
      预览

      报告相同问题?

      问题事件

      • 系统已结题 11月25日
      • 创建了问题 11月18日
      手机看
      程序员都在用的中文IT技术交流社区

      程序员都在用的中文IT技术交流社区

      专业的中文 IT 技术社区,与千万技术人共成长

      专业的中文 IT 技术社区,与千万技术人共成长

      关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

      关注【CSDN】视频号,行业资讯、技术分享精彩不断,直播好礼送不停!

      客服 返回
      顶部