一只川川木 2023-09-27 15:00 采纳率: 33.3%
浏览 25
已结题

关于C语言使用线程队列实现多线程并发

流程基本如下:
PC->批处理设备->多个终端设备

1.从PC发出5个命令
2.批处理设备收到命令后,将命令存入线程队列 taskQueue
3.从线程队列中取出task,并5个线程并发处理task
4.5个终端设备同时响应,并开始处理

问题:
但在实际测试中发现,无法实现多线程并发,只能等待一个任务完成后才能执行下一个任务
而在我的分析中,taskHandler这个函数的处理,由于加上了互斥锁,导致此问题
但是不加上互斥锁,又会造成数据混乱,导致终端设备处理出现异常

想请教一下,能否在不去掉互斥锁的前提下(优先保证数据正常),达成多线程并发?
非常感谢!

代码如下:

// 添加成功返回0,失败返回-1
int threadpoolAddTask(ST_TASK task)
{
    int ret;

    pthread_mutex_lock(&mutex);

    ret = enqueue(&taskQueue, task);
    LOG("enqueue ret %d", ret);

    pthread_mutex_unlock(&mutex);

    return ret;
}

//收到命令,将任务入队
void *procRecvCmdThread(void *arg)
{
    int ret;

    while (1)
    {
        int nCmd = recvCmdFromPC();
        // 这里处理需要应用逻辑命令:切换指令、关闭指令
        if (nCmd == SWITCH_CMD || nCmd == CLOSE_CMD)
        {
            // 添加任务
            ST_DEV_CONFIG *configRemain;
            configRemain = mallocAndSetDevConfig(g_curChannelNoTmp, g_curDevChoiceTmp);
            if (NULL == configRemain)
            {
                continue;
            }

            ST_TASK task;
            if (nCmd == SWITCH_CMD)
            {
                task.function = openDevProcess;
            }
            else
            {
                task.function = closeDevProcess;
            }

            task.arg = configRemain;
            ret = threadpoolAddTask(task);
            if (ret != 0)
            {
                sendRespToPC(nCmd, ERR_SELECT_CHANNEL, g_curChannelNoTmp);
                freeAndNULLDevConfig(&configRemain);
            }
        }

        OsSleep(100);
        // LOG("procRecvCmdThread while(1)");
    }
}

// 线程取出并执行任务,任务执行完成需释放参数空间
static void* taskHandler(void* arg)
{
    while(1)
    {
        ST_TASK *task;

        pthread_mutex_lock(&mutex);

        task = dequeue(&taskQueue);

        if (NULL == task)
        {
            pthread_mutex_unlock(&mutex);
            OsSleep(10);
            continue;
        }

        pthread_mutex_unlock(&mutex);
        LOG("dequeue OK");

        task->function(task->arg);
        free(task->arg);
    }
}

//循环创建线程,并执行取出的任务
int threadpoolInit(void)
{
    int i;
    pthread_t threadID[MAX_THREADS_SIZE];

    initializeQueue(&taskQueue);

    for (i = 0; i < MAX_THREADS_SIZE; i++)
    {
        pthread_create(&threadID[i], NULL, taskHandler, NULL);
    }
}
  • 写回答

8条回答 默认 最新

  • 专家-赤兔[在线] 优质创作者: 编程框架技术领域 2023-09-27 15:03
    关注

    引自免费微信小程序-皆我百晓生,希望对题主有所帮助,如有帮助,还望采纳。


    在不去掉互斥锁的前提下,可以使用条件变量(pthread_cond_t)来实现多线程并发。

    条件变量允许线程进入等待状态,等待某个条件满足后才被唤醒执行。在你的代码中,可以将条件变量与互斥锁配合使用,以实现多线程并发执行任务。

    首先,需要在代码中定义一个条件变量和一个互斥锁:

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    

    然后,在任务入队函数中,当任务队列不为空时,发出信号通知等待的线程:

    pthread_mutex_lock(&mutex);
    ret = enqueue(&taskQueue, task);
    LOG("enqueue ret %d", ret);
    pthread_cond_signal(&cond);  // 通知等待的线程
    pthread_mutex_unlock(&mutex);
    

    接着,在任务处理函数中,判断任务队列是否为空,如果为空则进入等待状态:

    pthread_mutex_lock(&mutex);
    while (isEmpty(&taskQueue))
    {
        pthread_cond_wait(&cond, &mutex);  // 进入等待状态
    }
    task = dequeue(&taskQueue);
    pthread_mutex_unlock(&mutex);
    LOG("dequeue OK");
    
    task->function(task->arg);
    free(task->arg);
    

    这样,当有新的任务加入任务队列时,等待的线程会被唤醒,并争夺互斥锁执行任务。

    需要注意的是,互斥锁和条件变量的使用需要遵循一定的规则,确保正确地同步和通信。在你的代码中,可能还需要进行一些细节的调整和修改,以确保代码的正确性和并发性能。

    希望这些信息能帮助到你!如有需要,请随时继续提问。

    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 9月28日
  • 修改了问题 9月27日
  • 创建了问题 9月27日