linux环境编程(2): 使用pipe完成进程间通信

linux系统内核为上层应用程序提供了多种进程间通信(IPC)的手段,适用于不同的场景,有些解决进程间数据传递的问题,另一些则解决进程间的同步问题。对于同样一种IPC机制,又有不同的API供应用程序使用,目前有POSIX IPC以及System V IPC可以为应用程序提供服务。后续的系列文章将逐一介绍消息队列,共享内存,信号量,socket,fifo等进程间通信方法,本篇文章主要总结了管道相关系统调用的使用方式。文中代码可以在这个代码仓库中获取,代码中使用了我自己实现的一个单元测试框架,对测试框架感兴趣的同学可以参考上一篇文章。

1. 写在前面

2. pipe介绍



管道有两个特点: 1) 通常只能在有亲缘关系的进程之间进行通信; 2) 阅后即焚;有亲缘关系是指,通信的两个进程可以是父子进程或者兄弟进程,这里的父子和兄弟是一个广义的概念, 子进程可以是父进程调用了多次fork创建出来的,而不仅局限在只经过一次fork,总之,只要通信双方的进程拿到了管道的文件描述符就可以使用管道了。说”阅后即焚“是因为管道中的数据在被进程读取之后就会被管道清除掉。有一个形象的比喻说,管道就像某个进程家族各个成员之间传递情报的中转站,情报内容阅后即焚。

3. pipe的基本使用


3.1 自言自语


CUTEST_CASE(basic_pipe, talking_to_myself) {     int pipefd[2];     pipe(pipefd);      const char *msg = "I'm talking to myself";     write(pipefd[1], msg, strlen(msg));      char buf[32];     read(pipefd[0], buf, 32);     printf("talking_to_myself: %sn", buf);      close(pipefd[0]);     close(pipefd[1]); } 

3.2 父进程向子进程传递数据


CUTEST_CASE(basic_pipe, parent2child) {     int pipefd[2];     pipe(pipefd);      const char *msg = "parent write, child read";     write(pipefd[1], msg, strlen(msg));      if (fork() == 0) {         close(pipefd[1]);         char buf[64];         memset(buf, 0, 64);         read(pipefd[0], buf, 64);         printf("parent2child: %sn", buf);         exit(0);     }      close(pipefd[0]);     close(pipefd[1]); } 

3.2 自进程向父进程传递数据


CUTEST_CASE(basic_pipe, child2parent) {     int pipefd[2];     pipe(pipefd);      if (fork() == 0) {         close(pipefd[0]);         const char *msg = "parent read, child write";         write(pipefd[1], msg, strlen(msg));         close(pipefd[1]);         exit(0);     }      close(pipefd[1]);      char buf[64];     memset(buf, 0, 64);     read(pipefd[0], buf, 64);     printf("child2parent: %sn", buf);     close(pipefd[0]); } 

3.3 父进程向多个子进程传递数据


 void fork_child_read(int id, int pipefd[2], const char *msg_pregix) {     if (fork() == 0) {         close(pipefd[1]);         int n;         read(pipefd[0], &n, sizeof(int));         printf("%s: child %d get data %dn", msg_pregix, id, n);         close(pipefd[0]);         exit(0);     } }  CUTEST_CASE(basic_pipe, parent2children) {     int pipefd[2];     pipe(pipefd);      for (int i = 1; i <= 10; i++)         write(pipefd[1], &i, sizeof(int));      const char *msg_prefix = "parent2children:";     fork_child_read(1, pipefd, msg_prefix);     fork_child_read(2, pipefd, msg_prefix);     fork_child_read(3, pipefd, msg_prefix);      close(pipefd[0]);     close(pipefd[1]); } 

3.4 父进程接收多个子进程的数据


void fork_child_write(int pipefd[2], int data) {     if (fork() == 0) {         close(pipefd[0]);         write(pipefd[1], &data, sizeof(int));         close(pipefd[1]);         exit(0);     } }  CUTEST_CASE(basic_pipe, children2parent) {     int pipefd[2];     pipe(pipefd);      int data[] = {512, 1024};      fork_child_write(pipefd, data[0]);     fork_child_write(pipefd, data[1]);      close(pipefd[1]);     int n;     while (read(pipefd[0], &n, sizeof(int)) == sizeof(int)) {         printf("children2parent: get data %dn", n);     }     close(pipefd[0]); } 

3.5 兄弟进程之间传递数据


CUTEST_CASE(basic_pipe, two_children) {     int pipefd[2];     pipe(pipefd);      const char *msg = "pipe between two children";     if (fork() == 0) {         close(pipefd[0]);         write(pipefd[1], msg, strlen(msg));         close(pipefd[1]);         exit(0);     }      if (fork() == 0) {         close(pipefd[1]);         char buf[64];         memset(buf, 0, 64);         read(pipefd[0], buf, 64);         printf("two_children: %sn", buf);         close(pipefd[0]);         exit(0);     }      close(pipefd[0]);     close(pipefd[1]); } 

3.6 阻塞和非阻塞的问题


CUTEST_CASE(basic_pipe, blocking_read) {     int pipefd[2];     pipe(pipefd);      if (fork() == 0) {         /* NOTE: remove the comment below if you don't want child process          * blocking while reading data from pipe. Otherwise you will see that          * there is still a "basic-pipe" process after you finish this test, and          * you have to kill it manually.*/         // close(pipefd[1]);          int num;         read(pipefd[0], &num, sizeof(int));          /* NOTE: since the write end of pipe is a valid file descriptor in          * current process, the print below should never execute.*/         printf("should NEVER goes heren");         exit(0);     }      close(pipefd[0]);     close(pipefd[1]);     printf("blocking_read: parent process exitn"); } 


3.7 测试执行结果


[junan@arch1 test-all]$ make install [junan@arch1 test-all]$ ./script/ basic-pipe blocking_read: parent process exit two_children: pipe between two children children2parent: get data 512 children2parent: get data 1024 parent2children:: child 1 get data 1 parent2children:: child 2 get data 2 parent2children:: child 3 get data 3 child2parent: parent read, child write talking_to_myself: I'm talking to myself cutest summary:         [basic_pipe] suit result: 7/7         [basic_pipe::blocking_read] case result: Pass         [basic_pipe::two_children] case result: Pass         [basic_pipe::children2parent] case result: Pass         [basic_pipe::parent2children] case result: Pass         [basic_pipe::child2parent] case result: Pass         [basic_pipe::parent2child] case result: Pass         [basic_pipe::talking_to_myself] case result: Pass parent2child: parent write, child read [junan@arch1 test-all]$ ps -e|grep basic-pipe   18866 pts/2    00:00:00 basic-pipe [junan@arch1 test-all]$ kill -9 18866 [junan@arch1 test-all]$ ps -e|grep basic-pipe [junan@arch1 test-all]$  

4. pipe的进阶使用

以上的几段示例代码说明了管道的一些基本使用方法和注意事项,下面看一个使用管道和多进程生成质数的问题。我们的需求是这样的,给定一个整数nmax,生成[2, nmax]区间上的所有质数,并且要求生成质数的核心逻辑使用管道和多进程。第一次碰到这个问题是在xv6操作系统的lab中,也是为了说明pipefork的使用。

看到这里,不妨先稍微思考一下?一个简单的想法可能是这样的,首先有一个函数,其功能是判断输入的n是否是质数,接下来遍历[2, nmax]上的整数,并且用之前的函数把质数都过滤出来,但问题是如何用管道和多进程实现这个函数的过滤功能呢?OK, 思考结束,来看看管道加多进程版本的质数生成器算法思路:

linux环境编程(2): 使用pipe完成进程间通信


void generate_primes(int pipe1[2]) {     close(pipe1[1]);      int prime = 0;     int err = read(pipe1[0], &prime, sizeof(int));     if (err <= 0) {         close(pipe1[0]);         return;     }     printf("%dn", prime);      int pipe2[2];     pipe(pipe2);      pid_t pid = fork();     if (pid == 0) {         generate_primes(pipe2);     } else {         int num = 0;         while ((err = read(pipe1[0], &num, sizeof(int))) > 0) {             if (num % prime) {                 write(pipe2[1], &num, sizeof(int));             }         }     }      close(pipe1[0]);     close(pipe2[0]);     close(pipe2[1]);     exit(0); }  CUTEST_SUIT(prime_numbers_pipe)  CUTEST_CASE(prime_numbers_pipe, prime_number_max30) {     int nmax = 30;      int pipe1[2];     pipe(pipe1);      for (int i = 2; i <= nmax; ++i)         write(pipe1[1], &i, sizeof(int));      if (fork() == 0) {         generate_primes(pipe1);     }      close(pipe1[0]);     close(pipe1[1]); } 


[junan@arch1 test-all]$ ./script/ prime-number-pipe cutest summary:         [prime_numbers_pipe] suit result: 1/1         [prime_numbers_pipe::prime_number_max30] case result: Pass 2 3 5 7 11 13 17 19 23 29 [junan@arch1 test-all]$  

5. 写在最后


