lyygliyuzhe 2015-05-09 04:58
浏览 785

使用信号机制捕捉异步IO时,出现个别IO信号丢失

首先我简单描述一下问题:
我编写了一个程序,来完成文件的拷贝功能。
首先从目标文件读取数据块,然后使用异步写操作写入另外一个文件中。
为了可以捕获每一次写操作的完结时间,我使用了信号机制。当异步写操作结束时,调用HandleIOResult函数进行处理。
现在的问题是:在捕获信号的过程中,会出现捕获的信号个数少于发送的信号个数。
而当我在每一次异步写之前添加usleep以后,则不会出现这种情况,即所有的信号都可以正确的捕获到。
求大神给看看到底是什么问题,应该怎么解决呢?
下面是代码:
有几点需要说明一下:
1、每次读取和写入的文件数据块大小是一样的,都是OBJ_PER_SIZE,总共读取和写入的个数为OBJ_MAX_NUM
2、数组arr_obj_mem主要用来记录每一个文件数据块的相关信息,以及它们写操作的开始和结束时间

 #include <stdio.h> 
#include <fcntl.h> 
#include <string.h> 
#include <pthread.h> 
#include <list> 
#include <aio.h> 
#include <stdint.h> 
#include <sys/time.h> 
#include <unistd.h> 

#define OBJ_PER_SIZE 65536    //64KB per obj 
#define OBJ_MAX_NUM 10000 

const char* file_read = "read.txt"; 
const char* file_write = "write.txt"; 

int fd_write = 0; 

int remained_obj_num = 0; 
int total_obj_num = 0; 
int write_ops_num = 0; 
int signal_get_num = 0; 

pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; 
pthread_cond_t finish = PTHREAD_COND_INITIALIZER; 

struct obj_info { 
  char* data; 
  uint32_t obj_id; 
  int data_size; 
  int primary_dir_id; 
  uint32_t offset_dir; 
  uint64_t offset_space; 
  double write_start_time; 
  double write_end_time; 
  double log_start_time; 
  double log_end_time; 
  double sync_start_time; 
  double sync_end_time; 
  bool finished; 
}; 

struct obj_info arr_obj_mem[OBJ_MAX_NUM]; 

struct SigData { 
  int type;     //1 for write 
  int data_id;  //match with arr_obj_mem 
  struct aiocb64* aio_req; 
}; 

struct aiocb64 arr_aio_write[OBJ_MAX_NUM]; 
struct sigaction sig_act_write[OBJ_MAX_NUM]; 
struct SigData arr_write_data[OBJ_MAX_NUM]; 

double GetMstime(void) 
{ 
  struct timeval curr_time; 
  gettimeofday(&curr_time, NULL); 
  return (curr_time.tv_sec * 1000.0 + curr_time.tv_usec / 1000.0); 
} 

bool AllFinished () { 
  int i; 
  for(i=0;i<total_obj_num;i++) { 
    if(!arr_obj_mem[i].finished) 
      return false; 
  } 
  return true; 
} 

void HandleIOResult(int signo, siginfo_t *info, void* context) { 
  struct SigData *req; 
  int ret = 0; 
  signal_get_num++; 
  if(info->si_signo == SIGIO) { 
    req = (SigData*)info->si_value.sival_ptr; 
    if(req->type == 1) {    //handle write operations 
      if(aio_error64(req->aio_req) == 0) { 
        ret = aio_return64(req->aio_req); 
        int obj_id = req->data_id; 
        if(write_ops_num >= remained_obj_num) { 
          pthread_cond_signal(&finish); 
        } 
      } 
    } 
  } 
} 

void DoWriteAndLog(int obj_id) { 
  int ret = 0; 
  sigemptyset(&sig_act_write[obj_id].sa_mask); 
  sig_act_write[obj_id].sa_flags = SA_SIGINFO; 
  sig_act_write[obj_id].sa_sigaction = HandleIOResult; 
  bzero( (char *)&arr_aio_write[obj_id], sizeof(struct aiocb64) ); 

  arr_aio_write[obj_id].aio_buf = arr_obj_mem[obj_id].data; 
  arr_aio_write[obj_id].aio_fildes = fd_write; 
  arr_aio_write[obj_id].aio_nbytes = arr_obj_mem[obj_id].data_size; 
  arr_aio_write[obj_id].aio_offset = arr_obj_mem[obj_id].offset_space;
  arr_aio_write[obj_id].aio_sigevent.sigev_notify = SIGEV_SIGNAL; 
  arr_aio_write[obj_id].aio_sigevent.sigev_signo = SIGIO; 

  arr_write_data[obj_id].type = 1; 
  arr_write_data[obj_id].data_id = obj_id; 
  arr_write_data[obj_id].aio_req = &arr_aio_write[obj_id]; 
  sigaction(SIGIO, &sig_act_write[obj_id], NULL); 

  arr_obj_mem[obj_id].write_start_time = GetMstime(); 

  ret = aio_write64(&arr_aio_write[obj_id]); 
  write_ops_num++; 
  if(ret != 0) 
    printf("aio_write64 error \n"); 
} 

void ReadAndWrite () { 
  fd_write = open(file_write, O_CREAT | O_WRONLY); 
  if(fd_write < 0) { 
    printf("open file %s failed!\n", file_write); 
    return ; 
  } 
  int fd_read = open(file_read, O_RDONLY); 
  if(fd_read < 0) { 
    printf("open file %s failed\n", file_read); 
    return ; 
  } 
  int i = 0; 
  uint64_t offset = 0; 
  int ret = 0; 
  printf("start to read file %s!\n", file_read); 
  while(true) { 
    if(i >= OBJ_MAX_NUM) 
    arr_obj_mem[i].data = new char[OBJ_PER_SIZE + 1]; 
    ret = pread(fd_read, arr_obj_mem[i].data, OBJ_PER_SIZE, offset); 
    if(ret < 0) { 
      printf("read file %s failed!\n", file_read); 
      return ; 
    } 
    if(ret == 0) { 
      printf("read zero bytes!\n"); 
    arr_obj_mem[i].obj_id = i; 
    arr_obj_mem[i].data_size = ret; 
    arr_obj_mem[i].offset_space = offset; 

    DoWriteAndLog(i); 
    offset += ret; 
    i++; 
    pthread_mutex_lock(&mutex); 
    remained_obj_num++; 
    pthread_mutex_unlock(&mutex); 
    total_obj_num++; 
    if(ret != OBJ_PER_SIZE) { 
      break; 
    } 
  } 
}

double total_write_time = 0;

void ResultCount() {
  int i;
  for(i=0;i<total_obj_num;i++) {
    if(arr_obj_mem[i].finished) {
      total_write_time += (arr_obj_mem[i].write_end_time - arr_obj_mem[i].write_start_time);
    }
  }
  printf("total_write_time = %.2f\n", total_write_time);
}

int main(int argc, char **argv)
{
  ReadAndWrite();
  pthread_mutex_lock(&mutex);
  while(!AllFinished()) {
    pthread_cond_wait(&finish, &mutex);
  }
  pthread_mutex_unlock(&mutex);
  printf("All Job Done!\n");
  ResultCount();
  return 0;
}
  • 写回答

0条回答 默认 最新

    报告相同问题?

    悬赏问题

    • ¥15 程序不包含适用于入口点的静态Main方法
    • ¥15 素材场景中光线烘焙后灯光失效
    • ¥15 请教一下各位,为什么我这个没有实现模拟点击
    • ¥15 执行 virtuoso 命令后,界面没有,cadence 启动不起来
    • ¥50 comfyui下连接animatediff节点生成视频质量非常差的原因
    • ¥20 有关区间dp的问题求解
    • ¥15 多电路系统共用电源的串扰问题
    • ¥15 slam rangenet++配置
    • ¥15 有没有研究水声通信方面的帮我改俩matlab代码
    • ¥15 ubuntu子系统密码忘记