【MPI学习5】MPI并行程序设计模式:组通信MPI程序设计

相关章节:第13章组通信MPI程序设计。

MPI组通信与点到点通信的一个重要区别就是:组通信需要特定组内所有成员参与,而点对点通信只涉及到发送方和接收方。

由于需要组内所有成员参与,因此也是一种比较复杂的通信方式。程序员在设计组通信语句的时候,需要同时考虑两点:

a. 程序运行起来之后,当前正在运行的进程的行为方式

b. 将组通信作为一个整体,考虑所有进程的行为方式

(1)概述

组通信从功能上实现了三个方面:

a. 通信:完成组内数据传输(广播、收集、散发、组收集、全互换各种数据交换传输方式

b. 同步:能够让组内所有进程在特定的位置上执行进度上取得一致,组通信虽然可以有同步功能,但是并不意味这同步一定发生(MPI_Barrier同步函数

c. 计算:通过给定数据的OP归约操作完成(分为MPI预定义OP归约操作、用户自定义OP归约操作

下面分别阐述上述三个方面的相关代码示例,采用的方法是把书上的代码段扩充成完整的可执行的程序来体会。

(2)各部分代码

下面阐述几个代码示例,把通信、同步、计算的例子都融入到具体的代码中。

示例1(Bcast广播通信)

从root进程读入一个数,并Bcast广播给组内所有进程并打印到终端上,代码如下:

#include "mpi.h"
#include <stdio.h>
#include <stdlib.h>

int main(int argc, char *argv[])
{
    int rank, value=0;
    MPI_Init(&argc, &argv);
    MPI_Comm_rank(MPI_COMM_WORLD, &rank);
    while(value>=0)
    {
        if (0==rank) {
            scanf("%d", &value);
        }
        MPI_Bcast(&value, 1, MPI_INT, 0, MPI_COMM_WORLD);
        printf("Process %d got %d
", rank, value);
    }
    MPI_Finalize();
    return 0;
}

代码执行结果如下:

上述代码中涉及到了Bcast函数的基本用法,在每个进程中都要出现,然后通过函数参数中的root进程号,进而区分哪个进程是发送方。

示例2(Alltoall全互换通信)

全互换。全互换是形式上最复杂的一种组通信方式,内容上是各种通信形式的全集。

全互换通信模式中,在组内所有通信涉及到的进程中:

a. 每个进程发送给其他进程的数据是不同的

b. 每个进程从其他进程获得数据也是不同的

这样描述还是太抽象,通过具体的图来解释一下:

上图描述了全互换发生前和发生后,数据变化的情况:

纵轴代表不同进程编号;左图的横轴代表发送缓冲区的段位大小,右图的横轴代表接受缓冲区的大小。

a. 第i个进程的发送缓冲区中的第j段数据(即左图Aij)代表的意义是:第i个进程发送给第j个进程的数据

b. 第j个进程的接受缓冲区中的第i段数据(即右图的Aji)代表的意义是:第j个进程从第i个进程中接收到的数据

结合a、b两点,我们可以知道全互换这种方式类似:将“进程-发送缓冲区”矩阵,转置成“进程-接受缓冲区”矩阵

对于同一对(i,j),左图的Aij的大小要求一定与右图的Aji的大小一样。

比如第0个进程发送给第2个进程的数据(即A02),必须与第2个进程给第0个进程留出来的接受缓冲区大小一致(即A20),满足这样的条件,就实现了全互换。

每个进程在全通信中既向所有进程发送数据,又同时从所有进程接受数据,但是需要注意上述的发送、接受缓冲区大小匹配。

看如下的代码:

 1 #include "mpi.h"
 2 #include <stdlib.h>
 3 #include <stdio.h>
 4 #include <string.h>
 5 #include <errno.h>
 6 
 7 int main(int argc, char *argv[])
 8 {
 9     int rank, size;
10     int chunk = 2; // 发送到一个进程的数据块大小
11     int i,j;
12     int *sbuf, *rbuf;
13 
14     MPI_Init(&argc, &argv);
15     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
16     MPI_Comm_size(MPI_COMM_WORLD, &size);
17     // 申请发送缓冲区 和 接收缓冲区
18     sbuf = (int*)malloc(size*chunk*sizeof(int)); // 申请发送缓冲区
19     if (!sbuf) {
20         perror("can't allocate send buffer");
21         MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
22     }
23     rbuf = (int*)malloc(size*chunk*sizeof(int)); // 申请接收缓冲区
24     if (!rbuf) {
25         perror("can't allocate recv buffer");
26         free(sbuf);
27         MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
28     }
29     // 设置每个进程的发送缓冲区 和 接收缓冲区的数据
30     for(i=0; i<size; i++) // i代表进程编号
31     {
32         for(j=0; j<chunk; j++) // j代表myid发送给进程i的第j个数据
33         {
34             sbuf[i*chunk+j] = rank+i*chunk+j; // 设置发送缓冲区的数据
35             printf("myid = %d, send to id = %d, data[%d] = %d
",rank ,i ,j, sbuf[i*chunk+j]);
36             rbuf[i*chunk+j] = 0; // 接收缓冲区清零
37         }
38     }
39     // 执行all to all调用
40     MPI_Alltoall(sbuf, chunk, MPI_INT, rbuf, chunk, MPI_INT, MPI_COMM_WORLD);
41     // 打印接收缓冲区从其他进程接收的数据
42     for(i=0; i<size; i++)
43     {
44         for(j=0; j<chunk; j++)
45         {
46             printf("myid = %d, recv from id = %d, data[%d] = %d
",rank ,i ,j, rbuf[i*chunk+j]);
47         }
48     }
49     free(rbuf);
50     free(sbuf);
51     MPI_Finalize();
52     return 0;
53 }

程序执行结果如下:

一共有2个进程,每个进程共有4个数据;可以看到执行全互换后的效果。

示例3

利用近似积分公式求圆周率π的大小。(Bcast广播、Barrier同步、REDUCE归约)

圆周率的大小可以用一个积分表达式来求解,求这个积分的过程可以用MPI组通信技术实现

代码如下:

 1 #include "mpi.h"
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 
 5 double f(double);
 6 
 7 int main(int argc, char *argv[])
 8 {
 9     int rank, size;
10     int n; // 细分出来的小矩阵个数
11     double pi; // 存放pi的计算值
12     
13     MPI_Init(&argc, &argv);
14     MPI_Comm_size(MPI_COMM_WORLD, &size);
15     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
16     if (0==rank) {
17         printf("number of small rectangle:
");
18         scanf("%d",&n);
19     }
20     MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD); // 将n值广播到各个进程中
21     int i;
22     double x, h, sum=0.0;
23     for (i=rank+1; i<=n; i+=size)
24     {
25         x = (i-0.5)/(double)n;
26         sum += f(x);
27     }
28     sum = sum/n;
29     MPI_Reduce(&sum, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD); 
30     if (0==rank) {
31         printf("Approximately of pi is : %.16f
",pi);
32         fflush(stdout);
33     }
34     MPI_Finalize();
35 }
36 
37 double f(double x) { return 4.0/(1.0+x*x); }

上述代码是按照如下思路实现的:

比如总共要把积分区间按X轴划分成1000份,即计算任务是求1000个小矩形的面积,

而MPI计算进程共有5个;则为了负载均衡,考虑让每个计算节点平均搞定200个小矩形的面积。

代码技巧:

这里有个需要处理的地方是:如何告诉每个计算节点(进程),需要计算哪些小矩形的面积呢?

一种直观的思路是,让第0个进程处理0~199小矩形,让第1个进程处理200~399小矩形...,以此类推。

上述的思路没有错误,但是对边界条件的处理可能麻烦一些:因为用这种连续区间的任务分配方式,就需要知道提前知道头和尾各在哪里,需要额外处理。

书上给的代码技巧是通过间隔划分的方式,比如有5个计算进程,第0个进程计算0,5,10,...这些矩形面积,第1个进程计算1,6,11,...这些矩形面积。

这样一来,任务分配的代码就简单了,一个循环直接搞定,而且不同进程不用区别对待。

上述代码执行结果如下:

小矩形个数越多,计算的值约逼近真实值。

示例4

有一个大数据集,每个计算进程中各有数据集中的一部分,现在想让每个进程都有全部的数据集(Bcast广播、Barrier同步)

代码如下:

 1 #include "mpi.h"
 2 #include <stdlib.h>
 3 #include <stdio.h>
 4 
 5 int main(int argc, char *argv[])
 6 {
 7     int rank,size,i;
 8     int *table;
 9     int errors = 0;
10     MPI_Aint address;
11     MPI_Datatype type, newtype;
12     int lens;
13 
14     MPI_Init(&argc, &argv);
15     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
16     MPI_Comm_size(MPI_COMM_WORLD, &size);
17     table = (int*)calloc(size, sizeof(int));
18     table[rank] = rank + 1; // 准备要广播的数据
19     MPI_Barrier(MPI_COMM_WORLD);
20     for(i=0; i<size; i++) MPI_Bcast(&table[i], 1, MPI_INT, i, MPI_COMM_WORLD); // 每个进程只当一次root
21     for(i=0; i<size; i++)
22     {
23         if (table[i]!=i+1) {
24             errors++;
25         }
26     }
27     MPI_Barrier(MPI_COMM_WORLD);
28     if (0==rank) {
29         for(i=0;i<size;i++) printf("table[%d]:%d
",i,table[i]);
30     }
31     MPI_Finalize();
32 }

代码执行结果如下:

代码分析:

a. line19执行了第一次Barrier同步,目的是让各个进程都准备好各自拥有数据集的一部分(这里同步的目的是,让各个进程一方面table分配好空间,并且准备好数据)

b. line27执行了第二次Barrier同步,目的是让Bcast都完成,然后在root进程中查看table数据是否都发送过去了

其中line20的代码技巧也值得学习,每个进程只当一次root,而其余的时候全都是当配角,通信中的接收一方

示例5

组通信中的死锁。组通信的相同函数有可能是同步的,也有可能是异步的,这个受软硬件平台影响;为了构建可移植性好的代码,在设计组通信程序的时候,无论是异步还是同步的,都应该保证程序不出现死锁。下面用几个例子解释死锁出现的情况。

代码1. 同一个通信域中的死锁示例

 1 #include "mpi.h"
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include <unistd.h> 
 5 
 6 #define LEN 10
 7 
 8 int main(int argc, char *argv[])
 9 {
10     int rank,size;
11 
12     MPI_Init(&argc, &argv);
13     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
14     MPI_Comm_size(MPI_COMM_WORLD, &size);
15 
16     if(2!=size) MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
17     int* buf1 = (int*)malloc(sizeof(int)*LEN);
18     int* buf2 = (int*)malloc(sizeof(int)*LEN);
19     buf1[0] = 1;
20     buf2[0] = 1;
21     if (0==rank) {
22         MPI_Bcast(buf1,LEN,MPI_INT,0,MPI_COMM_WORLD);
23         MPI_Bcast(buf2,LEN,MPI_INT,1,MPI_COMM_WORLD);
24         printf("0 done
");
25     }
26     if (1==rank) {
27         MPI_Bcast(buf2,LEN,MPI_INT,1,MPI_COMM_WORLD);
28         MPI_Bcast(buf1,LEN,MPI_INT,0,MPI_COMM_WORLD);
29         printf("1 done
");
30     }
31     free(buf1);
32     free(buf2);
33     MPI_Finalize();
34 }

代码执行结果如下:

可以看到并没有出现deadlock。原因是由于LEN定义的发送和接收数据长度太小,MPI给Bcast执行方式选择的是缓存非同步方式,即Bcast不必等到所有需要参加组通信的进程都完成了再返回。

如果我们把LEN定义的长度改为100000,则执行结果如下:

执行结果表明deadlock出现了。原因是由于LEN定义的发送和接受的数据长度较大,MPI给Bcast选择的执行方式是同步方式,即Bcast必须等到所有需要参加广播通信的进程都完成了,才能够正确返回。

书上并没有提Bcast是异步还是同步的事情,我在stackoverflow上提问才知道了上面的道理(http://stackoverflow.com/questions/35628524/why-this-mpi-bcast-related-code-not-deadlock)。感谢stackoverflow上的认真回答。

代码2. 不同通信域中的死锁示例

这部分内容与后面第15章的进程组和通信域知识相关,需要调换顺序提前补充。

总共有3个进程,三个进程在MPI_COMM_WORLD中的rank分别是{0,1,2}

现在构造三个新的通信域comm0 comm1 comm2

comm0中包含MPI_COMM_WORLD中rank为0,1的进程

comm1中包含MPI_COMM_WORLD中rank为1,2的进程

comm2中包含MPI_COMM_WORLD中rank为2,0的进程

需要注意是,虽然进程客观上只有3个,但是相同的进程在不同的通信域中的rank号是不同的。

下面的代码构造了一个循环依赖的deadlock

 1 #include "mpi.h"
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 
 5 #define LEN 100000
 6 
 7 int main(int argc, char *argv[])
 8 {
 9     MPI_Init(&argc, &argv);
10     int rank, size;
11     MPI_Comm_size(MPI_COMM_WORLD, &size);
12     if (3!=size) {
13         printf("wrong proc num
");
14         MPI_Abort(MPI_COMM_WORLD, EXIT_FAILURE);
15     }
16     const int ranks[3] = {0,1,2};
17     MPI_Group world_group;
18     MPI_Comm_group(MPI_COMM_WORLD, &world_group);
19     // 构造三个新的进程组对象
20     MPI_Group group0, group1, group2;
21     const int ranks0[2] = {0,1};
22     const int ranks1[2] = {1,2};
23     const int ranks2[2] = {2,0};
24     MPI_Group_incl(world_group, 2, ranks0, &group0);
25     MPI_Group_incl(world_group, 2, ranks1, &group1);
26     MPI_Group_incl(world_group, 2, ranks2, &group2);
27 
28     // 利用三个进程对象构造通信域
29     MPI_Comm comm0, comm1, comm2;
30     MPI_Comm_create(MPI_COMM_WORLD, group0, &comm0);
31     MPI_Comm_create(MPI_COMM_WORLD, group1, &comm1);
32     MPI_Comm_create(MPI_COMM_WORLD, group2, &comm2);
33     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
34     int *buf1 = (int*)malloc(sizeof(int)*LEN);
35     int *buf2 = (int*)malloc(sizeof(int)*LEN);
36     buf1[0] = 1;
37     buf2[0] = 1;
38     int sub_rank;
39     if (0==rank) {
40         MPI_Group_translate_ranks(world_group,1,&ranks[0],group0,&sub_rank);
41         MPI_Bcast(buf1, LEN, MPI_INT, sub_rank, comm0);
42         MPI_Group_translate_ranks(world_group,1,&ranks[2],group2,&sub_rank);
43         MPI_Bcast(buf2, LEN, MPI_INT, sub_rank, comm2);
44     }
45     if (1==rank) {
46         MPI_Group_translate_ranks(world_group,1,&ranks[1],group1,&sub_rank);
47         MPI_Bcast(buf1, LEN, MPI_INT, sub_rank, comm1);
48         MPI_Group_translate_ranks(world_group,1,&ranks[0],group0,&sub_rank);
49         MPI_Bcast(buf2, LEN, MPI_INT, sub_rank, comm0);
50     }
51     if (2==rank) {
52         MPI_Group_translate_ranks(world_group,1,&ranks[2],group2,&sub_rank);
53         MPI_Bcast(buf1, LEN, MPI_INT, sub_rank, comm2);
54         MPI_Group_translate_ranks(world_group,1,&ranks[1],group1,&sub_rank);
55         MPI_Bcast(buf2, LEN, MPI_INT, sub_rank, comm1);
56     }
57     MPI_Finalize();
58 }

代码执行结果是deadlock,这个deadlock本身不难理解,就是comm0依赖comm1,comm1依赖comm2,comm2依赖comm0,循环依赖造成了死锁,互相都等着。

书上只给出来了关键代码的原型,并没有给出来通信域是怎么构造的,Bcast中的sub_rank是怎么得来的,需要补充一些进程组和通信域的知识。

进程组和通信域使得MPI程序与传统的串行程序的设计思路上有很大不同,初学甚至有些别扭,我也没有全部领会。

额外参考了https://www.rc.usf.edu/tutorials/classes/tutorial/mpi/chapter9.html

只能先把与这个例子相关的知识记录下来:

a. 通信域(Communicator)是大的概念,进程组是通信域的一个属性

b. 一个通信域对应一个进程组

c. 一个进程可以在多个进程组中,因此也可以在多个通信域中

d. 通过进程组来构造通信域是通信域的一种生成方式

e. 相同的进程在不同的通信域中的rank是不同的,可以通过函数来查阅这种对应关系

f. 比如,调用MPI_Comm_create函数,利用进程组{0,1}构造一个通信域comm0;那么进程2也会执行这条语句,则进程2中对应的comm0就是MPI_COMM_NULL

关于上面提到的f,我用http://mpitutorial.com/tutorials/introduction-to-groups-and-communicators/里面的一个代码示例进行了验证,加深理解:

 1 #include "mpi.h"
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 
 5 int main(int argc, char *argv[])
 6 {
 7     MPI_Init(&argc, &argv);
 8     int world_rank, world_size;
 9     MPI_Comm_rank(MPI_COMM_WORLD, &world_rank);
10     MPI_Comm_rank(MPI_COMM_WORLD, &world_size);
11 
12     MPI_Group world_group;
13     MPI_Comm_group(MPI_COMM_WORLD, &world_group);
14 
15     int n = 7;
16     const int ranks[7] = {1,2,3,5,7,11,13};
17 
18     MPI_Group prime_group;
19     // 构造新的进程组 这个进程组是客观的 不管当前进程是哪个
20     MPI_Group_incl(world_group, 7, ranks, &prime_group);
21 
22     MPI_Comm prime_comm;
23     // 如果当前进程不在prime_group中 则prime_com就是MPI_COMM_NULL
24     MPI_Comm_create_group(MPI_COMM_WORLD, prime_group, 0, &prime_comm);
25 
26     int prime_rank = -1, prime_size = -1;
27     if (MPI_COMM_NULL!=prime_comm) {
28         MPI_Comm_rank(prime_comm, &prime_rank);
29         MPI_Comm_size(prime_comm, &prime_size);
30     }
31 
32     printf("WORLD RANK/SIZE: %d/%d 	 PRIME RANK/SIZE: %d/%d
",world_rank, world_size, prime_rank, prime_size);
33     if(MPI_COMM_NULL!=prime_comm) MPI_Barrier(prime_comm);
34     MPI_Barrier(MPI_COMM_WORLD);
35     if(MPI_GROUP_NULL!=world_group) MPI_Group_free(&world_group);
36     if(MPI_GROUP_NULL!=prime_group) MPI_Group_free(&prime_group);
37     if(MPI_COMM_NULL!=prime_comm) MPI_Comm_free(&prime_comm);
38     MPI_Finalize();
39 }

代码执行结果如下:

从原来进程中扣出来几个进程构造新进程;对于没被扣出来的那些进程,执行到MPI_Comm_create这里的时候,prime_comm就是MPI_COMM_NULL。

示例6

归约操作。这部分代码都是书上的内容,认真看书即可。

代码1 MINLOC和MAXLOC

有时候不仅需要知道一堆数据中的最小(最大)值,而且需要知道最小最大值对应的编号(或进程号)是多少。

通过MPI提供的MPI_REDUCE归约函数,以及预定义的归约操作MPI_MINLOC活MPI_MAXLOC即可方便实现。

 1 #include "mpi.h"
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include <time.h> 
 5 
 6 int main(int argc, char *argv[])
 7 {
 8     MPI_Init(&argc, &argv);
 9     int i, myrank, root;
10     root = 0;
11     double ain[3]; // 各个进程的输入数据
12     double aout[3]; // 只有root进程有用 用于存放规约后的数字的具体值
13     int ind[3]; // 只有root进程有用 用于存放规约后的数字的编号
14     struct{
15         double val;
16         int rank;
17     } in[3], out[3]; // 规约操作的发送缓冲区和接收缓冲区
18     MPI_Comm_rank(MPI_COMM_WORLD, &myrank);
19     srand((unsigned int)(myrank+time(NULL)));
20     for(i=0;i<3;i++) ain[i] = (1.0*rand())/RAND_MAX;
21     for(i=0; i<3; i++){
22         in[i].val = ain[i];
23         in[i].rank = myrank;
24     }
25     printf("myrank:%d %.3f,%.3f,%.3f
",myrank, ain[0],ain[1],ain[2]);
26     MPI_Reduce(in, out, 3, MPI_DOUBLE_INT, MPI_MAXLOC, root, MPI_COMM_WORLD);
27     if (myrank==root) {
28         printf("reduce result:
");
29         for(i=0; i<3; i++){
30             aout[i] = out[i].val;
31             ind[i] = out[i].rank;
32             printf("%.3f of rank %d
", aout[i], ind[i]);
33         }
34     }
35     MPI_Finalize();
36 }

代码执行结果如下:

可以看到这种归约操作:

a. 要求每个进程中的输入数据的大小是一样的(在这个例子中都是长度为3的double数组),

b. 归约的对象是不同进程的数组中相同位置的数据

另外,也允许用户自定义归约操作,如下面代码定义了复数相乘的归约操作(书上只给了代码框架,具体细节中的问题参考http://stackoverflow.com/questions/9285442/mpi-get-processor-with-minimum-value)。

 1 #include "mpi.h"
 2 #include <stdio.h>
 3 #include <stdlib.h>
 4 #include <time.h> 
 5 
 6 typedef struct{
 7     double real;
 8     double imag;
 9 }Complex;
10 
11 void multiplyOP(Complex *, Complex *, int *, MPI_Datatype *);
12 
13 int main(int argc, char *argv[])
14 {
15     MPI_Init(&argc, &argv);
16     int root = 0;
17     int rank;
18     MPI_Comm_rank(MPI_COMM_WORLD, &rank);
19     Complex input[2], output[2];
20     if (0==rank) {
21         input[0].real = 1; input[0].imag = 1;
22         input[1].real = 1; input[1].imag = 2;
23     }
24     if (1==rank) {
25         input[0].real = 1; input[0].imag = -1;
26         input[1].real = 1; input[1].imag = 2;
27     }
28     // 在MPI中注册和构造复数类型
29     MPI_Datatype ctype;
30     MPI_Type_contiguous(2, MPI_DOUBLE, &ctype); // 在MPI中构造复数结构体 连续的两个DOUBLE类型
31     MPI_Type_commit(&ctype); // 在MPI中注册刚构造的复数结构体
32     // 在MPI中构造自定义归约操作
33     MPI_Op myOp;
34     MPI_Op_create((MPI_User_function*)multiplyOP, 1, &myOp); // 生成用户定义的乘积操作
35     MPI_Reduce(input, output, 2, ctype, myOp, root, MPI_COMM_WORLD); // 执行复数乘积操作
36     // 在root中打印结果
37     if (root==rank) {
38         printf("reduce result of 0 is : %1.0f+(%1.0f)i
",output[0].real, output[0].imag);
39         printf("reduce result of 1 is : %1.0f+(%1.0f)i
",output[1].real, output[1].imag);
40     }
41     MPI_Finalize();
42 }
43 
44 // 计算复数的乘积
45 void multiplyOP(Complex *in, Complex *inout, int *len, MPI_Datatype *datatype)
46 {
47     int i;
48     Complex c;
49     for(i=0; i<*len; i++){
50         c.real = inout->real*in->real - inout->imag*in->imag;
51         c.imag = inout->real*in->imag + inout->imag*in->real;
52         *inout = c; // 把计算结果存回到inout的位置
53         in++;
54         inout++;
55     }
56 }

代码的执行结果如下:

原文地址:https://www.cnblogs.com/xbf9xbf/p/5223645.html