source: XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_lib.cpp @ 1187

Last change on this file since 1187 was 1187, checked in by yushan, 7 years ago

add pending list for irecv

File size: 10.6 KB
Line 
1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4#include <iostream>
5#include <fstream>
6
7using namespace std;
8
9
10namespace ep_lib
11{ 
12
13  int tag_combine(int real_tag, int src, int dest)
14  {
15    int a = real_tag << 16;
16    int b = src << 8;
17    int c = dest;
18
19    return a+b+c;
20  }
21
22  int get_ep_rank(MPI_Comm comm, int ep_rank_loc, int mpi_rank)
23  {
24    for(int i=0; i<comm.rank_map->size(); i++)
25    {
26      if(   ( comm.rank_map->at(i).first  == ep_rank_loc )
27         && ( comm.rank_map->at(i).second == mpi_rank ) )
28      {
29        return i;
30      }
31    }
32    printf("rank not find\n");
33  }
34 
35  int get_ep_rank_intercomm(MPI_Comm comm, int ep_rank_loc, int mpi_rank)
36  {
37    // intercomm
38    int inter_rank;
39    for(int i=0; i<comm.ep_comm_ptr->intercomm->intercomm_rank_map->size(); i++)
40    {
41      if(   ( comm.ep_comm_ptr->intercomm->intercomm_rank_map->at(i).first  == ep_rank_loc )
42         && ( comm.ep_comm_ptr->intercomm->intercomm_rank_map->at(i).second == mpi_rank ) )
43      {
44        inter_rank =  i;
45        break;
46      }
47    }
48
49    for(int i=0; i<comm.ep_comm_ptr->intercomm->remote_rank_map->size(); i++)
50    {
51      if(  comm.ep_comm_ptr->intercomm->remote_rank_map->at(i).first  == inter_rank  )
52      {
53        //printf("get_ep_rank for intercomm, ep_rank_loc = %d, mpi_rank = %d => ep_src = %d\n", ep_rank_loc, mpi_rank, i);
54        return i;
55      }
56    }
57
58    printf("rank not find\n");
59   
60  }
61
62
63  int innode_memcpy(int sender, const void* sendbuf, int receiver, void* recvbuf, int count, MPI_Datatype datatype, MPI_Comm comm)
64  {
65    int ep_rank, ep_rank_loc, mpi_rank;
66    int ep_size, num_ep, mpi_size;
67
68    ep_rank = comm.ep_comm_ptr->size_rank_info[0].first;
69    ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first;
70    mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first;
71    ep_size = comm.ep_comm_ptr->size_rank_info[0].second;
72    num_ep = comm.ep_comm_ptr->size_rank_info[1].second;
73    mpi_size = comm.ep_comm_ptr->size_rank_info[2].second;
74
75
76
77    if(datatype == MPI_INT)
78    {
79
80      int* send_buf = static_cast<int*>(const_cast<void*>(sendbuf));
81      int* recv_buf = static_cast<int*>(recvbuf);
82      int* buffer = comm.my_buffer->buf_int;
83
84      for(int j=0; j<count; j+=BUFFER_SIZE)
85      {
86        if(ep_rank_loc == sender)
87        {
88          #pragma omp critical (write_to_buffer)
89          {
90            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
91          }
92          #pragma omp flush
93        }
94
95        MPI_Barrier_local(comm);
96
97
98        if(ep_rank_loc == receiver)
99        {
100          #pragma omp flush
101          #pragma omp critical (read_from_buffer)
102          {
103            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
104          }
105        }
106
107        MPI_Barrier_local(comm);
108      }
109    }
110    else if(datatype == MPI_FLOAT)
111    {
112
113      float* send_buf = static_cast<float*>(const_cast<void*>(sendbuf));
114      float* recv_buf = static_cast<float*>(recvbuf);
115      float* buffer = comm.my_buffer->buf_float;
116
117      for(int j=0; j<count; j+=BUFFER_SIZE)
118      {
119        if(ep_rank_loc == sender)
120        {
121          #pragma omp critical (write_to_buffer)
122          {
123            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
124          }
125          #pragma omp flush
126        }
127
128        MPI_Barrier_local(comm);
129
130
131        if(ep_rank_loc == receiver)
132        {
133          #pragma omp flush
134          #pragma omp critical (read_from_buffer)
135          {
136            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
137          }
138        }
139
140        MPI_Barrier_local(comm);
141      }
142    }
143    else if(datatype == MPI_DOUBLE)
144    {
145
146
147      double* send_buf = static_cast<double*>(const_cast<void*>(sendbuf));
148      double* recv_buf = static_cast<double*>(recvbuf);
149      double* buffer = comm.my_buffer->buf_double;
150
151      for(int j=0; j<count; j+=BUFFER_SIZE)
152      {
153        if(ep_rank_loc == sender)
154        {
155          #pragma omp critical (write_to_buffer)
156          {
157            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
158          }
159          #pragma omp flush
160        }
161
162        MPI_Barrier_local(comm);
163
164
165        if(ep_rank_loc == receiver)
166        {
167          #pragma omp flush
168          #pragma omp critical (read_from_buffer)
169          {
170            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
171          }
172        }
173
174        MPI_Barrier_local(comm);
175      }
176    }
177    else if(datatype == MPI_LONG)
178    {
179      long* send_buf = static_cast<long*>(const_cast<void*>(sendbuf));
180      long* recv_buf = static_cast<long*>(recvbuf);
181      long* buffer = comm.my_buffer->buf_long;
182
183      for(int j=0; j<count; j+=BUFFER_SIZE)
184      {
185        if(ep_rank_loc == sender)
186        {
187          #pragma omp critical (write_to_buffer)
188          {
189            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
190          }
191          #pragma omp flush
192        }
193
194        MPI_Barrier_local(comm);
195
196
197        if(ep_rank_loc == receiver)
198        {
199          #pragma omp flush
200          #pragma omp critical (read_from_buffer)
201          {
202            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
203          }
204        }
205
206        MPI_Barrier_local(comm);
207      }
208    }
209    else if(datatype == MPI_UNSIGNED_LONG)
210    {
211      unsigned long* send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf));
212      unsigned long* recv_buf = static_cast<unsigned long*>(recvbuf);
213      unsigned long* buffer = comm.my_buffer->buf_ulong;
214
215      for(int j=0; j<count; j+=BUFFER_SIZE)
216      {
217        if(ep_rank_loc == sender)
218        {
219          #pragma omp critical (write_to_buffer)
220          {
221            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
222          }
223          #pragma omp flush
224        }
225
226        MPI_Barrier_local(comm);
227
228
229        if(ep_rank_loc == receiver)
230        {
231          #pragma omp flush
232          #pragma omp critical (read_from_buffer)
233          {
234            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
235          }
236        }
237
238        MPI_Barrier_local(comm);
239      }
240    }
241    else if(datatype == MPI_CHAR)
242    {
243      char* send_buf = static_cast<char*>(const_cast<void*>(sendbuf));
244      char* recv_buf = static_cast<char*>(recvbuf);
245      char* buffer = comm.my_buffer->buf_char;
246
247      for(int j=0; j<count; j+=BUFFER_SIZE)
248      {
249        if(ep_rank_loc == sender)
250        {
251          #pragma omp critical (write_to_buffer)
252          {
253            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
254          }
255          #pragma omp flush
256        }
257
258        MPI_Barrier_local(comm);
259
260
261        if(ep_rank_loc == receiver)
262        {
263          #pragma omp flush
264          #pragma omp critical (read_from_buffer)
265          {
266            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
267          }
268        }
269
270        MPI_Barrier_local(comm);
271      }
272    }
273    else
274    {
275      printf("datatype not supported!!\n");
276      exit(1);
277    }
278    return 0;
279  }
280
281
282  int MPI_Get_count(const MPI_Status *status, MPI_Datatype datatype, int *count)
283  {
284/*
285    ::MPI_Aint datasize, char_size, lb;
286
287    ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize);
288    ::MPI_Type_get_extent(MPI_CHAR, &lb, &char_size);
289
290    *count = status->char_count / ( datasize/ char_size);
291
292    //printf("MPI_Get_count, status_count  = %d\n", *count);
293    return 0;
294*/
295    ::MPI_Status *mpi_status = static_cast< ::MPI_Status* >(status->mpi_status);
296    ::MPI_Datatype mpi_datatype = static_cast< ::MPI_Datatype >(datatype);
297
298    ::MPI_Get_count(mpi_status, mpi_datatype, count);
299  }
300
301  double MPI_Wtime()
302  {
303    return ::MPI_Wtime();
304
305  }
306
307  void check_sum_send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, int type)
308  {
309    int src_rank;
310    int int_count;
311    ::MPI_Aint datasize, intsize, charsize, lb;
312   
313    ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize);
314    ::MPI_Type_get_extent(MPI_CHAR_STD, &lb, &intsize);
315
316    int_count = count * datasize / intsize ;
317
318    char *buffer = static_cast< char* >(const_cast< void*> (buf));
319   
320    unsigned long sum = 0;
321    for(int i = 0; i<int_count; i++)
322      sum += *(buffer+i); 
323
324
325    MPI_Comm_rank(comm, &src_rank);
326   
327    ofstream myfile;
328    myfile.open ("send_log.txt", ios::app);
329    if (myfile.is_open())
330    {
331      myfile << "type = " << type << " src = "<< src_rank<< " dest = "<< dest <<" tag = "<< tag << "  count = "<< count << " sum = "<< sum << "\n";
332      myfile.close(); 
333    }
334    else printf("Unable to open file\n");
335
336  }
337
338
339  void check_sum_recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, int type)
340  {
341    int dest_rank;
342    int int_count;
343    ::MPI_Aint datasize, intsize, charsize, lb;
344   
345    ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize);
346    ::MPI_Type_get_extent(MPI_CHAR_STD, &lb, &intsize);
347
348    int_count = count * datasize / intsize ;
349
350    char *buffer = static_cast< char* >(buf);
351   
352    unsigned long sum = 0;
353    for(int i = 0; i<int_count; i++)
354      sum += *(buffer+i); 
355
356
357    MPI_Comm_rank(comm, &dest_rank);
358   
359    ofstream myfile;
360    myfile.open ("recv_log.txt", ios::app);
361    if (myfile.is_open())
362    {
363      myfile << "type = " << type << " src = "<< src << " dest = "<< dest_rank <<" tag = "<< tag << "  count = "<< count << " sum = "<< sum << "\n";
364      myfile.close(); 
365    }
366    else printf("Unable to open file\n");
367
368  }
369
370  int test_sendrecv(MPI_Comm comm)
371  {
372    int myRank;
373    MPI_Comm_rank(comm, &myRank);
374    bool amClient = false;
375    bool amServer = false;
376    if(myRank<=3) amClient = true;
377    else amServer = true;
378
379    if(amServer)
380    {
381      int send_buf[4];
382      MPI_Request send_request[8];
383      MPI_Status send_status[8];
384
385     
386     
387      for(int j=0; j<4; j++)  // 4 buffers
388      {
389        for(int i=0; i<2; i++)
390        {
391          send_buf[j] = (myRank+1)*100 + j;
392          MPI_Isend(&send_buf[j], 1, MPI_INT, i*2, 9999, comm, &send_request[i*4+j]);
393        }
394      }
395     
396
397      MPI_Waitall(8, send_request, send_status);
398    }
399
400
401    if(amClient&&myRank%2==0) // Clients leaders
402    {
403      int recv_buf[8];
404      MPI_Request recv_request[8];
405      MPI_Status recv_status[8];
406
407      for(int i=0; i<2; i++)  // 2 servers
408      {
409        for(int j=0; j<4; j++)
410        {
411          MPI_Irecv(&recv_buf[i*4+j], 1, MPI_INT, i+4, 9999, comm, &recv_request[i*4+j]);
412        }
413      }
414
415      MPI_Waitall(8, recv_request, recv_status);
416      printf("============ client %d, recv_buf = %d, %d, %d, %d, %d, %d, %d, %d ================\n", 
417              myRank, recv_buf[0], recv_buf[1], recv_buf[2], recv_buf[3], recv_buf[4], recv_buf[5], recv_buf[6], recv_buf[7]);
418    }
419
420    MPI_Barrier(comm);
421
422  }
423
424}
425
426
427
428
429
430
Note: See TracBrowser for help on using the repository browser.