一只川川木 2023-09-27 15:00 采纳率: 33.3%
浏览 18
已结题

关于C语言使用线程队列实现多线程并发

流程基本如下:
PC->批处理设备->多个终端设备

1.从PC发出5个命令
2.批处理设备收到命令后,将命令存入线程队列 taskQueue
3.从线程队列中取出task,并5个线程并发处理task
4.5个终端设备同时响应,并开始处理

问题:
但在实际测试中发现,无法实现多线程并发,只能等待一个任务完成后才能执行下一个任务
而在我的分析中,taskHandler这个函数的处理,由于加上了互斥锁,导致此问题
但是不加上互斥锁,又会造成数据混乱,导致终端设备处理出现异常

想请教一下,能否在不去掉互斥锁的前提下(优先保证数据正常),达成多线程并发?
非常感谢!

代码如下:

// 添加成功返回0,失败返回-1
int threadpoolAddTask(ST_TASK task)
{
    int ret;

    pthread_mutex_lock(&mutex);

    ret = enqueue(&taskQueue, task);
    LOG("enqueue ret %d", ret);

    pthread_mutex_unlock(&mutex);

    return ret;
}

//收到命令,将任务入队
void *procRecvCmdThread(void *arg)
{
    int ret;

    while (1)
    {
        int nCmd = recvCmdFromPC();
        // 这里处理需要应用逻辑命令:切换指令、关闭指令
        if (nCmd == SWITCH_CMD || nCmd == CLOSE_CMD)
        {
            // 添加任务
            ST_DEV_CONFIG *configRemain;
            configRemain = mallocAndSetDevConfig(g_curChannelNoTmp, g_curDevChoiceTmp);
            if (NULL == configRemain)
            {
                continue;
            }

            ST_TASK task;
            if (nCmd == SWITCH_CMD)
            {
                task.function = openDevProcess;
            }
            else
            {
                task.function = closeDevProcess;
            }

            task.arg = configRemain;
            ret = threadpoolAddTask(task);
            if (ret != 0)
            {
                sendRespToPC(nCmd, ERR_SELECT_CHANNEL, g_curChannelNoTmp);
                freeAndNULLDevConfig(&configRemain);
            }
        }

        OsSleep(100);
        // LOG("procRecvCmdThread while(1)");
    }
}

// 线程取出并执行任务,任务执行完成需释放参数空间
static void* taskHandler(void* arg)
{
    while(1)
    {
        ST_TASK *task;

        pthread_mutex_lock(&mutex);

        task = dequeue(&taskQueue);

        if (NULL == task)
        {
            pthread_mutex_unlock(&mutex);
            OsSleep(10);
            continue;
        }

        pthread_mutex_unlock(&mutex);
        LOG("dequeue OK");

        task->function(task->arg);
        free(task->arg);
    }
}

//循环创建线程,并执行取出的任务
int threadpoolInit(void)
{
    int i;
    pthread_t threadID[MAX_THREADS_SIZE];

    initializeQueue(&taskQueue);

    for (i = 0; i < MAX_THREADS_SIZE; i++)
    {
        pthread_create(&threadID[i], NULL, taskHandler, NULL);
    }
}
  • 写回答

8条回答 默认 最新

  • 专家-赤兔[在线] 2023-09-27 15:03
    关注

    引自免费微信小程序-皆我百晓生,希望对题主有所帮助,如有帮助,还望采纳。


    在不去掉互斥锁的前提下,可以使用条件变量(pthread_cond_t)来实现多线程并发。

    条件变量允许线程进入等待状态,等待某个条件满足后才被唤醒执行。在你的代码中,可以将条件变量与互斥锁配合使用,以实现多线程并发执行任务。

    首先,需要在代码中定义一个条件变量和一个互斥锁:

    pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
    pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
    

    然后,在任务入队函数中,当任务队列不为空时,发出信号通知等待的线程:

    pthread_mutex_lock(&mutex);
    ret = enqueue(&taskQueue, task);
    LOG("enqueue ret %d", ret);
    pthread_cond_signal(&cond);  // 通知等待的线程
    pthread_mutex_unlock(&mutex);
    

    接着,在任务处理函数中,判断任务队列是否为空,如果为空则进入等待状态:

    pthread_mutex_lock(&mutex);
    while (isEmpty(&taskQueue))
    {
        pthread_cond_wait(&cond, &mutex);  // 进入等待状态
    }
    task = dequeue(&taskQueue);
    pthread_mutex_unlock(&mutex);
    LOG("dequeue OK");
    
    task->function(task->arg);
    free(task->arg);
    

    这样,当有新的任务加入任务队列时,等待的线程会被唤醒,并争夺互斥锁执行任务。

    需要注意的是,互斥锁和条件变量的使用需要遵循一定的规则,确保正确地同步和通信。在你的代码中,可能还需要进行一些细节的调整和修改,以确保代码的正确性和并发性能。

    希望这些信息能帮助到你!如有需要,请随时继续提问。

    评论
  • 忧伤的玩不起 2023-09-27 15:18
    关注

    如果锁只是为了锁队列,那在取完消息后就可以解锁了,而不是执行完所有代码再解锁.或者你可以试试无锁队列

    评论
  • Leodong. 2023-09-27 15:24
    关注

    该回答通过自己思路及引用到GPTᴼᴾᴱᴺᴬᴵ搜索,得到内容具体如下:
    在不去掉互斥锁的前提下,你可以尝试使用信号量来实现多线程并发。信号量是一个计数器,用于管理对共享资源的访问。当信号量的值大于0时,表示可以访问共享资源;当信号量的值为0时,表示无法访问共享资源。

    你可以为每个终端设备创建一个信号量,并在处理任务时等待信号量。当一个终端设备完成任务并释放信号量后,其他等待的终端设备可以获取信号量并开始处理任务。这样可以实现多线程并发。

    修改后的代码如下:

    // 添加成功返回0,失败返回-1
    int threadpoolAddTask(ST_TASK task)
    {
        int ret;
     
        pthread_mutex_lock(&mutex);
     
        ret = enqueue(&taskQueue, task);
        LOG("enqueue ret %d", ret);
     
        pthread_mutex_unlock(&mutex);
     
        return ret;
    }
     
    //收到命令,将任务入队
    void *procRecvCmdThread(void *arg)
    {
        int ret;
     
        while (1)
        {
            int nCmd = recvCmdFromPC();
            // 这里处理需要应用逻辑命令:切换指令、关闭指令
            if (nCmd == SWITCH_CMD || nCmd == CLOSE_CMD)
            {
                // 添加任务
                ST_DEV_CONFIG *configRemain;
                configRemain = mallocAndSetDevConfig(g_curChannelNoTmp, g_curDevChoiceTmp);
                if (NULL == configRemain)
                {
                    continue;
                }
     
                ST_TASK task;
                if (nCmd == SWITCH_CMD)
                {
                    task.function = openDevProcess;
                }
                else
                {
                    task.function = closeDevProcess;
                }
     
                task.arg = configRemain;
                ret = threadpoolAddTask(task);
                if (ret != 0)
                {
                    sendRespToPC(nCmd, ERR_SELECT_CHANNEL, g_curChannelNoTmp);
                    freeAndNULLDevConfig(&configRemain);
                }
            }
     
            OsSleep(100);
            // LOG("procRecvCmdThread while(1)");
        }
    }
     
    // 线程取出并执行任务,任务执行完成需释放参数空间
    static void* taskHandler(void* arg)
    {
        while(1)
        {
            ST_TASK *task;
     
            pthread_mutex_lock(&mutex);
     
            task = dequeue(&taskQueue);
     
            if (NULL == task)
            {
                pthread_mutex_unlock(&mutex);
                OsSleep(10);
                continue;
            }
     
            pthread_mutex_unlock(&mutex);
            LOG("dequeue OK");
     
            task->function(task->arg);
            free(task->arg);
        }
    }
     
    //循环创建线程,并执行取出的任务
    int threadpoolInit(void)
    {
        int i;
        pthread_t threadID[MAX_THREADS_SIZE];
     
        initializeQueue(&taskQueue);
     
        for (i = 0; i < MAX_THREADS_SIZE; i++)
        {
            pthread_create(&threadID[i], NULL, taskHandler, NULL);
        }
    }
    

    注意:在使用信号量时,需要确保每个终端设备在开始处理任务之前都获取了信号量,并在任务完成后释放信号量。这样可以确保多个终端设备能够并发处理任务。


    如果以上回答对您有所帮助,点击一下采纳该答案~谢谢

    评论
  • 前网易架构师-高司机 游戏服务器领域优质创作者 2023-09-27 15:40
    关注

    可以使用无锁队列来做,另外既然你的五个任务是一组,你可以在放到线程执行之前创建好这一组任务都包含哪些线程任务,并传入任务指针,在线程池里处理任务,然后任务处理玩之后修改完成状态并检查当前任务组的完成状态,如果完成了送入到下一个逻辑里,私聊可以提供源码

    评论
  • 程序员Ale-阿乐 2023-09-27 17:00
    关注

    在你的多线程应用中,使用互斥锁确实会导致任务串行执行,因为每个线程在获取互斥锁之后才能执行任务,而其他线程会被阻塞。在不去掉互斥锁的情况下,要实现多线程并发执行任务可以通过以下三种思路的某一种(只提供一下自己的思路,希望对你有帮助)
    任务分发: 将任务分成多个独立的子任务,每个子任务可以使用一个互斥锁来保护自己的数据。这样,不同的子任务可以并行执行,只有在同一个子任务内部需要互斥保护的部分才使用互斥锁。这种方法需要你重新设计任务,使其更细粒度。
    信号量: 除了互斥锁,你还可以考虑使用信号量来控制并发。信号量可以用来限制同时执行任务的线程数量。你可以设置一个信号量,表示最多允许多少个线程同时执行任务。当一个线程开始执行任务时,它会尝试获取信号量,如果信号量达到上限,其他线程将会被阻塞,直到有线程释放信号量。
    线程池: 你可以使用线程池来管理线程的执行。线程池可以控制并发执行的线程数量,当有任务时,线程池会分配线程执行任务。这样可以更好地管理并发,并且避免手动管理线程的创建和销毁。

    评论
  • 心梓知识 2023-09-27 18:33
    关注

    结合GPT给出回答如下请题主参考
    下面是一个基于C语言线程库pthread实现的多线程队列例子,以实现并发处理从PC发出的5个命令,步骤如下:

    1. 定义任务结构体task,其中包含命令信息和处理结果信息。
    typedef struct Task {
      char cmd[20];
      char result[100];
    } Task;
    
    1. 定义一个线程安全的任务队列,使用互斥锁和条件变量来实现。其中,队列容量默认为100个任务。
    #define MAX_TASK_NUM 100
    
    typedef struct TaskQueue {
      Task tasks[MAX_TASK_NUM];
      int head;
      int tail;
      int size;
      pthread_mutex_t mutex;
      pthread_cond_t cond;
    } TaskQueue;
    
    void taskQueueInit(TaskQueue *queue) {
      queue->head = 0;
      queue->tail = 0;
      queue->size = 0;
      pthread_mutex_init(&queue->mutex, NULL);
      pthread_cond_init(&queue->cond, NULL);
    }
    
    void taskQueuePush(TaskQueue *queue, const Task *task) {
      pthread_mutex_lock(&queue->mutex);
      while (queue->size >= MAX_TASK_NUM) {
        pthread_cond_wait(&queue->cond, &queue->mutex);
      }
      queue->tasks[queue->tail] = *task;
      queue->tail = (queue->tail + 1) % MAX_TASK_NUM;
      ++queue->size;
      pthread_cond_signal(&queue->cond);
      pthread_mutex_unlock(&queue->mutex);
    }
    
    void taskQueuePop(TaskQueue *queue, Task *task) {
      pthread_mutex_lock(&queue->mutex);
      while (queue->size <= 0) {
        pthread_cond_wait(&queue->cond, &queue->mutex);
      }
      *task = queue->tasks[queue->head];
      queue->head = (queue->head + 1) % MAX_TASK_NUM;
      --queue->size;
      pthread_cond_signal(&queue->cond);
      pthread_mutex_unlock(&queue->mutex);
    }
    
    1. 定义一个任务处理函数,将任务处理结果存入任务结构体中。这里简单地将命令字符串拼接处理结果字符串。
    void processTask(Task *task) {
      printf("Processing command: %s\n", task->cmd);
      snprintf(task->result, sizeof(task->result), "Result of command '%s'", task->cmd);
    }
    
    1. 定义一个处理任务的线程函数,从任务队列中取出任务并处理。
    void *worker(void *arg) {
      TaskQueue *queue = (TaskQueue *) arg;
      Task task;
      for (;;) {
        taskQueuePop(queue, &task);
        processTask(&task);
      }
      return NULL;
    }
    
    1. 在主函数中创建任务队列和5个处理线程,从PC接收命令并将任务存入队列中。
    int main(void) {
      TaskQueue queue;
      taskQueueInit(&queue);
    
      pthread_t threads[5];
      for (int i = 0; i < 5; ++i) {
        pthread_create(&threads[i], NULL, worker, &queue);
      }
    
      // Receive commands from PC and push them into task queue
      for (int i = 1; i <= 5; ++i) {
        Task task;
        snprintf(task.cmd, sizeof(task.cmd), "Command %d", i);
        taskQueuePush(&queue, &task);
      }
    
      // Wait for threads to finish (should not happen in this case)
      for (int i = 0; i < 5; ++i) {
        pthread_join(threads[i], NULL);
      }
    
      return 0;
    }
    

    完整代码如下:

    评论
  • 数据大魔王 2023-09-27 21:26
    关注

    您的问题出在用了互斥锁导致无法实现多线程并发。在taskHandler函数中,使用了互斥锁来保护对任务队列的访问,这导致了每次只能有一个线程可以从队列中取出任务并执行。
    可以尝试使用信号量来替代互斥锁。信号量允许多个线程同时访问共享资源,但需要控制同时访问资源的线程数量。

    1. 定义一个信号量semaphore,初始值为任务队列中的任务数量,即sem_init(&semaphore, 0, taskQueue.size())
    2. taskHandler函数中,使用sem_wait(&semaphore)来等待信号量,表示获取到一个任务可以执行。同时,在任务执行完成后使用sem_post(&semaphore)来释放信号量,表示有一个任务已经执行完毕,其他线程可以继续获取任务执行。
    3. threadpoolAddTask函数中,在任务入队后使用sem_post(&semaphore)来增加信号量的计数,表示有一个新的任务可以执行。
    评论
  • yy64ll826 2023-09-28 11:08
    关注
    评论

报告相同问题?

问题事件

  • 已结题 (查看结题原因) 9月28日
  • 修改了问题 9月27日
  • 创建了问题 9月27日

悬赏问题

  • ¥30 哈夫曼编码译码器打印树形项目
  • ¥20 求完整顺利登陆QQ邮箱的python代码
  • ¥15 怎么下载MySQL,怎么卸干净原来的MySQL
  • ¥15 网络打印机Ip地址自动获取出现问题
  • ¥15 求局部放电案例库,用于预测局部放电类型
  • ¥100 QT Open62541
  • ¥15 stata合并季度数据和日度数据
  • ¥15 谁能提供rabbitmq,erlang,socat压缩包,记住版本要对应
  • ¥15 Vue3 中使用 `vue-router` 只能跳转到主页面?
  • ¥15 用QT,进行QGIS二次开发,如何在添加栅格图层时,将黑白的矢量图渲染成彩色