一只川川木 2023-09-27 15:00 采纳率: 33.3%
浏览 20
已结题

关于C语言使用线程队列实现多线程并发

流程基本如下:
PC->批处理设备->多个终端设备

1.从PC发出5个命令
2.批处理设备收到命令后,将命令存入线程队列 taskQueue
3.从线程队列中取出task,并5个线程并发处理task
4.5个终端设备同时响应,并开始处理

问题:
但在实际测试中发现,无法实现多线程并发,只能等待一个任务完成后才能执行下一个任务
而在我的分析中,taskHandler这个函数的处理,由于加上了互斥锁,导致此问题
但是不加上互斥锁,又会造成数据混乱,导致终端设备处理出现异常

想请教一下,能否在不去掉互斥锁的前提下(优先保证数据正常),达成多线程并发?
非常感谢!

代码如下:

// 添加成功返回0,失败返回-1
int threadpoolAddTask(ST_TASK task)
{
    int ret;

    pthread_mutex_lock(&mutex);

    ret = enqueue(&taskQueue, task);
    LOG("enqueue ret %d", ret);

    pthread_mutex_unlock(&mutex);

    return ret;
}

//收到命令,将任务入队
void *procRecvCmdThread(void *arg)
{
    int ret;

    while (1)
    {
        int nCmd = recvCmdFromPC();
        // 这里处理需要应用逻辑命令:切换指令、关闭指令
        if (nCmd == SWITCH_CMD || nCmd == CLOSE_CMD)
        {
            // 添加任务
            ST_DEV_CONFIG *configRemain;
            configRemain = mallocAndSetDevConfig(g_curChannelNoTmp, g_curDevChoiceTmp);
            if (NULL == configRemain)
            {
                continue;
            }

            ST_TASK task;
            if (nCmd == SWITCH_CMD)
            {
                task.function = openDevProcess;
            }
            else
            {
                task.function = closeDevProcess;
            }

            task.arg = configRemain;
            ret = threadpoolAddTask(task);
            if (ret != 0)
            {
                sendRespToPC(nCmd, ERR_SELECT_CHANNEL, g_curChannelNoTmp);
                freeAndNULLDevConfig(&configRemain);
            }
        }

        OsSleep(100);
        // LOG("procRecvCmdThread while(1)");
    }
}

// 线程取出并执行任务,任务执行完成需释放参数空间
static void* taskHandler(void* arg)
{
    while(1)
    {
        ST_TASK *task;

        pthread_mutex_lock(&mutex);

        task = dequeue(&taskQueue);

        if (NULL == task)
        {
            pthread_mutex_unlock(&mutex);
            OsSleep(10);
            continue;
        }

        pthread_mutex_unlock(&mutex);
        LOG("dequeue OK");

        task->function(task->arg);
        free(task->arg);
    }
}

//循环创建线程,并执行取出的任务
int threadpoolInit(void)
{
    int i;
    pthread_t threadID[MAX_THREADS_SIZE];

    initializeQueue(&taskQueue);

    for (i = 0; i < MAX_THREADS_SIZE; i++)
    {
        pthread_create(&threadID[i], NULL, taskHandler, NULL);
    }
}
  • 写回答

8条回答 默认 最新

  • 专家-赤兔[在线] 全栈领域优质创作者 2023-09-27 15:03
    关注

    引自免费微信小程序-皆我百晓生,希望对题主有所帮助,如有帮助,还望采纳。


    在不去掉互斥锁的前提下,可以使用条件变量(pthread_cond_t)来实现多线程并发。

    条件变量允许线程进入等待状态,等待某个条件满足后才被唤醒执行。在你的代码中,可以将条件变量与互斥锁配合使用,以实现多线程并发执行任务。

    首先,需要在代码中定义一个条件变量和一个互斥锁:

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    

    然后,在任务入队函数中,当任务队列不为空时,发出信号通知等待的线程:

    pthread_mutex_lock(&mutex);
    ret = enqueue(&taskQueue, task);
    LOG("enqueue ret %d", ret);
    pthread_cond_signal(&cond);  // 通知等待的线程
    pthread_mutex_unlock(&mutex);
    

    接着,在任务处理函数中,判断任务队列是否为空,如果为空则进入等待状态:

    pthread_mutex_lock(&mutex);
    while (isEmpty(&taskQueue))
    {
        pthread_cond_wait(&cond, &mutex);  // 进入等待状态
    }
    task = dequeue(&taskQueue);
    pthread_mutex_unlock(&mutex);
    LOG("dequeue OK");
    
    task->function(task->arg);
    free(task->arg);
    

    这样,当有新的任务加入任务队列时,等待的线程会被唤醒,并争夺互斥锁执行任务。

    需要注意的是,互斥锁和条件变量的使用需要遵循一定的规则,确保正确地同步和通信。在你的代码中,可能还需要进行一些细节的调整和修改,以确保代码的正确性和并发性能。

    希望这些信息能帮助到你!如有需要,请随时继续提问。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 9月28日
  • 修改了问题 9月27日
  • 创建了问题 9月27日

悬赏问题

  • ¥15 如何在node.js中或者java中给wav格式的音频编码成sil格式呢
  • ¥15 不小心不正规的开发公司导致不给我们y码,
  • ¥15 我的代码无法在vc++中运行呀,错误很多
  • ¥50 求一个win系统下运行的可自动抓取arm64架构deb安装包和其依赖包的软件。
  • ¥60 fail to initialize keyboard hotkeys through kernel.0000000000
  • ¥30 ppOCRLabel导出识别结果失败
  • ¥15 Centos7 / PETGEM
  • ¥15 csmar数据进行spss描述性统计分析
  • ¥15 各位请问平行检验趋势图这样要怎么调整?说标准差差异太大了
  • ¥15 delphi webbrowser组件网页下拉菜单自动选择问题