首先我简单描述一下问题:
我编写了一个程序,来完成文件的拷贝功能。
首先从目标文件读取数据块,然后使用异步写操作写入另外一个文件中。
为了可以捕获每一次写操作的完结时间,我使用了信号机制。当异步写操作结束时,调用HandleIOResult函数进行处理。
现在的问题是:在捕获信号的过程中,会出现捕获的信号个数少于发送的信号个数。
而当我在每一次异步写之前添加usleep以后,则不会出现这种情况,即所有的信号都可以正确的捕获到。
求大神给看看到底是什么问题,应该怎么解决呢?
下面是代码:
有几点需要说明一下:
1、每次读取和写入的文件数据块大小是一样的,都是OBJ_PER_SIZE,总共读取和写入的个数为OBJ_MAX_NUM
2、数组arr_obj_mem主要用来记录每一个文件数据块的相关信息,以及它们写操作的开始和结束时间
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
#include <pthread.h>
#include <list>
#include <aio.h>
#include <stdint.h>
#include <sys/time.h>
#include <unistd.h>
#define OBJ_PER_SIZE 65536 //64KB per obj
#define OBJ_MAX_NUM 10000
const char* file_read = "read.txt";
const char* file_write = "write.txt";
int fd_write = 0;
int remained_obj_num = 0;
int total_obj_num = 0;
int write_ops_num = 0;
int signal_get_num = 0;
pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER;
pthread_cond_t finish = PTHREAD_COND_INITIALIZER;
struct obj_info {
char* data;
uint32_t obj_id;
int data_size;
int primary_dir_id;
uint32_t offset_dir;
uint64_t offset_space;
double write_start_time;
double write_end_time;
double log_start_time;
double log_end_time;
double sync_start_time;
double sync_end_time;
bool finished;
};
struct obj_info arr_obj_mem[OBJ_MAX_NUM];
struct SigData {
int type; //1 for write
int data_id; //match with arr_obj_mem
struct aiocb64* aio_req;
};
struct aiocb64 arr_aio_write[OBJ_MAX_NUM];
struct sigaction sig_act_write[OBJ_MAX_NUM];
struct SigData arr_write_data[OBJ_MAX_NUM];
double GetMstime(void)
{
struct timeval curr_time;
gettimeofday(&curr_time, NULL);
return (curr_time.tv_sec * 1000.0 + curr_time.tv_usec / 1000.0);
}
bool AllFinished () {
int i;
for(i=0;i<total_obj_num;i++) {
if(!arr_obj_mem[i].finished)
return false;
}
return true;
}
void HandleIOResult(int signo, siginfo_t *info, void* context) {
struct SigData *req;
int ret = 0;
signal_get_num++;
if(info->si_signo == SIGIO) {
req = (SigData*)info->si_value.sival_ptr;
if(req->type == 1) { //handle write operations
if(aio_error64(req->aio_req) == 0) {
ret = aio_return64(req->aio_req);
int obj_id = req->data_id;
if(write_ops_num >= remained_obj_num) {
pthread_cond_signal(&finish);
}
}
}
}
}
void DoWriteAndLog(int obj_id) {
int ret = 0;
sigemptyset(&sig_act_write[obj_id].sa_mask);
sig_act_write[obj_id].sa_flags = SA_SIGINFO;
sig_act_write[obj_id].sa_sigaction = HandleIOResult;
bzero( (char *)&arr_aio_write[obj_id], sizeof(struct aiocb64) );
arr_aio_write[obj_id].aio_buf = arr_obj_mem[obj_id].data;
arr_aio_write[obj_id].aio_fildes = fd_write;
arr_aio_write[obj_id].aio_nbytes = arr_obj_mem[obj_id].data_size;
arr_aio_write[obj_id].aio_offset = arr_obj_mem[obj_id].offset_space;
arr_aio_write[obj_id].aio_sigevent.sigev_notify = SIGEV_SIGNAL;
arr_aio_write[obj_id].aio_sigevent.sigev_signo = SIGIO;
arr_write_data[obj_id].type = 1;
arr_write_data[obj_id].data_id = obj_id;
arr_write_data[obj_id].aio_req = &arr_aio_write[obj_id];
sigaction(SIGIO, &sig_act_write[obj_id], NULL);
arr_obj_mem[obj_id].write_start_time = GetMstime();
ret = aio_write64(&arr_aio_write[obj_id]);
write_ops_num++;
if(ret != 0)
printf("aio_write64 error \n");
}
void ReadAndWrite () {
fd_write = open(file_write, O_CREAT | O_WRONLY);
if(fd_write < 0) {
printf("open file %s failed!\n", file_write);
return ;
}
int fd_read = open(file_read, O_RDONLY);
if(fd_read < 0) {
printf("open file %s failed\n", file_read);
return ;
}
int i = 0;
uint64_t offset = 0;
int ret = 0;
printf("start to read file %s!\n", file_read);
while(true) {
if(i >= OBJ_MAX_NUM)
arr_obj_mem[i].data = new char[OBJ_PER_SIZE + 1];
ret = pread(fd_read, arr_obj_mem[i].data, OBJ_PER_SIZE, offset);
if(ret < 0) {
printf("read file %s failed!\n", file_read);
return ;
}
if(ret == 0) {
printf("read zero bytes!\n");
arr_obj_mem[i].obj_id = i;
arr_obj_mem[i].data_size = ret;
arr_obj_mem[i].offset_space = offset;
DoWriteAndLog(i);
offset += ret;
i++;
pthread_mutex_lock(&mutex);
remained_obj_num++;
pthread_mutex_unlock(&mutex);
total_obj_num++;
if(ret != OBJ_PER_SIZE) {
break;
}
}
}
double total_write_time = 0;
void ResultCount() {
int i;
for(i=0;i<total_obj_num;i++) {
if(arr_obj_mem[i].finished) {
total_write_time += (arr_obj_mem[i].write_end_time - arr_obj_mem[i].write_start_time);
}
}
printf("total_write_time = %.2f\n", total_write_time);
}
int main(int argc, char **argv)
{
ReadAndWrite();
pthread_mutex_lock(&mutex);
while(!AllFinished()) {
pthread_cond_wait(&finish, &mutex);
}
pthread_mutex_unlock(&mutex);
printf("All Job Done!\n");
ResultCount();
return 0;
}