我要起用C语言两个线程通过管道通信,一个写一个读,但是写完之后没有调读的callback,求解。
typedef struct {
uv_write_t req;
uv_buf_t buf;
} write_req_t;
uv_pipe_t file_pipe;
uv_pipe_t stdin_pipe;
uv_pipe_t stdout_pipe;
uv_loop_t *loop1;
uv_loop_t *loop2;
void free_write_req(uv_write_t *req) {
write_req_t *wr = (write_req_t*) req;
free(wr->buf.base);
free(wr);
}
void on_stdout_write(uv_write_t *req, int status) {
free_write_req(req);
}
void on_file_write(uv_write_t *req, int status) {
free_write_req(req);
}
void on_stdin_write(uv_write_t *req, int status) {
// printf("//////\n");
free_write_req(req);
}
void write_data(uv_stream_t *dest, size_t size, uv_buf_t buf, uv_write_cb cb) {
write_req_t *req = (write_req_t*) malloc(sizeof(write_req_t));
req->buf = uv_buf_init((char*) malloc(size), size);
memcpy(req->buf.base, buf.base, size);
uv_write((uv_write_t*) req, (uv_stream_t*)dest, &req->buf, 1, cb);
}
void stdin_listen(uv_stream_t*server, int status)
{
printf("listen\n");
}
void alloc_buffer(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) {
*buf = uv_buf_init((char*) malloc(suggested_size), suggested_size);
//printf("malloc\n");
}
void read_stdout(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
printf("-----%d\n",nread);
printf("%s\n",buf->base);
}
void read_stdin(uv_stream_t *stream, ssize_t nread, const uv_buf_t *buf)
{
static int read_count = 0;
printf("%d==\n", nread);
if (nread < 0){
if (nread == UV_EOF){
// end of file
uv_close((uv_handle_t *)&stdin_pipe, NULL);
uv_close((uv_handle_t *)&stdout_pipe, NULL);
uv_close((uv_handle_t *)&file_pipe, NULL);
}
} else if (nread > 0) {
write_data((uv_stream_t *)&stdout_pipe, nread, *buf, on_stdout_write);
write_data((uv_stream_t *)&file_pipe, nread, *buf, on_file_write);
}
if (buf->base)
{
read_count++;
printf("%s===%d\n", buf->base, read_count);
free(buf->base);
}
}
void write_buff(uv_stream_t *dest, uv_buf_t *buff)
{
//char buff[10] = "ASDFGHJKL";
size_t size;
uv_write_t *req = (uv_write_t*) malloc(sizeof(uv_write_t));
//size = strlen(buff);
//req->buf = uv_buf_init((char*) malloc(size), size);
// memcpy(req->buf.base, buff, size);
if (uv_write((uv_write_t*) req, (uv_stream_t*)dest, buff, 1, on_stdin_write) != 0)
{
perror("write error:");
}
else
{
//printf("write success %d\n", dest->type);
// printf("%s ---- %d\n",req->buf.base, req->buf.len);
}
}
void read_fun()
{
loop1 = uv_default_loop();
loop2 = uv_default_loop();
uv_fs_t file_req1;
// int fd1 = uv_fs_open(loop2, &file_req1, "temp", O_CREAT | O_RDWR, 0644, NULL);
uv_pipe_init(loop2, &stdin_pipe, 1);
uv_pipe_open(&stdin_pipe, 0);
uv_pipe_init(loop1, &stdout_pipe, 1);
uv_pipe_open(&stdout_pipe, 1);
uv_pipe_bind(&stdin_pipe,"temp");
uv_listen((uv_stream_t*)&stdin_pipe,128, stdin_listen);
uv_fs_t file_req;
int fd = uv_fs_open(loop1, &file_req, "123456", O_CREAT | O_RDWR, 0644, NULL);
uv_pipe_init(loop1, &file_pipe, 0);
uv_pipe_open(&file_pipe, fd);
uv_read_start((uv_stream_t*)&stdin_pipe, alloc_buffer, read_stdin);
uv_read_start((uv_stream_t*)&stdout_pipe, alloc_buffer, read_stdout);
//uv_listen((uv_stream_t*)&stdin_pipe,128, stdin_listen);
//write_buff((uv_stream_t *)&stdin_pipe);
uv_run(loop1, UV_RUN_DEFAULT);
uv_run(loop2, UV_RUN_DEFAULT);
}
void callback(uv_connect_t *req,int status)
{
printf("1111\n");
free(req);
}
void write_fun()
{
static int write_count = 0;
uv_connect_t* connect_req;
size_t size;
char buff[10] = "123456789";
uv_buf_t write_buf;
size = strlen(buff);
write_buf = uv_buf_init(buff, size);
connect_req = malloc(sizeof(*connect_req));
uv_pipe_connect(connect_req, &stdin_pipe, "temp",callback);
while(1)
{
write_count++;
//write_buff((uv_stream_t *)&stdin_pipe, &write_buf);
write_data((uv_stream_t *)&stdin_pipe, size, write_buf, on_stdin_write);
//write_data((uv_stream_t *)&stdout_pipe, size, write_buf, on_stdout_write);
//printf("write %d\n",write_count);
// break;
sleep(1);
}
}
int main()
{
pthread_t id_p1;
pthread_t id_p2;
pthread_create(&id_p2, NULL, (void *)read_fun, NULL);
pthread_create(&id_p1, NULL, (void *)write_fun, NULL);
pthread_join(id_p1,NULL);
pthread_join(id_p2,NULL);
}