source: XIOS/dev/branch_yushan/extern/src_ep_dev/ep_lib.cpp @ 1037

Last change on this file since 1037 was 1037, checked in by yushan, 7 years ago

initialize the branch

File size: 9.3 KB
Line 
1#include "ep_lib.hpp"
2#include <mpi.h>
3#include "ep_declaration.hpp"
4#include <iostream>
5#include <fstream>
6
7using namespace std;
8
9
10namespace ep_lib
11{
12
13
14  int tag_combine(int real_tag, int src, int dest)
15  {
16    int a = real_tag << 16;
17    int b = src << 8;
18    int c = dest;
19
20    return a+b+c;
21  }
22
23  int get_ep_rank(MPI_Comm comm, int ep_rank_loc, int mpi_rank)
24  {
25    for(int i=0; i<comm.rank_map->size(); i++)
26    {
27      if(   ( comm.rank_map->at(i).first  == ep_rank_loc )
28         && ( comm.rank_map->at(i).second == mpi_rank ) )
29      {
30        return i;
31      }
32    }
33    printf("rank not find\n");
34  }
35 
36  int get_ep_rank_intercomm(MPI_Comm comm, int ep_rank_loc, int mpi_rank)
37  {
38    // intercomm
39    int inter_rank;
40    for(int i=0; i<comm.ep_comm_ptr->intercomm->intercomm_rank_map->size(); i++)
41    {
42      if(   ( comm.ep_comm_ptr->intercomm->intercomm_rank_map->at(i).first  == ep_rank_loc )
43         && ( comm.ep_comm_ptr->intercomm->intercomm_rank_map->at(i).second == mpi_rank ) )
44      {
45        inter_rank =  i;
46        break;
47      }
48    }
49
50    for(int i=0; i<comm.ep_comm_ptr->intercomm->remote_rank_map->size(); i++)
51    {
52      if(  comm.ep_comm_ptr->intercomm->remote_rank_map->at(i).first  == inter_rank  )
53      {
54        printf("get_ep_rank for intercomm, ep_rank_loc = %d, mpi_rank = %d => ep_src = %d\n", ep_rank_loc, mpi_rank, i);
55        return i;
56      }
57    }
58
59    printf("rank not find\n");
60   
61  }
62
63
64  int innode_memcpy(int sender, const void* sendbuf, int receiver, void* recvbuf, int count, MPI_Datatype datatype, MPI_Comm comm)
65  {
66    int ep_rank, ep_rank_loc, mpi_rank;
67    int ep_size, num_ep, mpi_size;
68
69    ep_rank = comm.ep_comm_ptr->size_rank_info[0].first;
70    ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first;
71    mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first;
72    ep_size = comm.ep_comm_ptr->size_rank_info[0].second;
73    num_ep = comm.ep_comm_ptr->size_rank_info[1].second;
74    mpi_size = comm.ep_comm_ptr->size_rank_info[2].second;
75
76
77
78    if(datatype == MPI_INT)
79    {
80
81      int* send_buf = static_cast<int*>(const_cast<void*>(sendbuf));
82      int* recv_buf = static_cast<int*>(recvbuf);
83      int* buffer = comm.my_buffer->buf_int;
84
85      for(int j=0; j<count; j+=BUFFER_SIZE)
86      {
87        if(ep_rank_loc == sender)
88        {
89          #pragma omp critical (write_to_buffer)
90          {
91            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
92          }
93          #pragma omp flush
94        }
95
96        MPI_Barrier_local(comm);
97
98
99        if(ep_rank_loc == receiver)
100        {
101          #pragma omp flush
102          #pragma omp critical (read_from_buffer)
103          {
104            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
105          }
106        }
107
108        MPI_Barrier_local(comm);
109      }
110    }
111    else if(datatype == MPI_FLOAT)
112    {
113
114      float* send_buf = static_cast<float*>(const_cast<void*>(sendbuf));
115      float* recv_buf = static_cast<float*>(recvbuf);
116      float* buffer = comm.my_buffer->buf_float;
117
118      for(int j=0; j<count; j+=BUFFER_SIZE)
119      {
120        if(ep_rank_loc == sender)
121        {
122          #pragma omp critical (write_to_buffer)
123          {
124            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
125          }
126          #pragma omp flush
127        }
128
129        MPI_Barrier_local(comm);
130
131
132        if(ep_rank_loc == receiver)
133        {
134          #pragma omp flush
135          #pragma omp critical (read_from_buffer)
136          {
137            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
138          }
139        }
140
141        MPI_Barrier_local(comm);
142      }
143    }
144    else if(datatype == MPI_DOUBLE)
145    {
146
147
148      double* send_buf = static_cast<double*>(const_cast<void*>(sendbuf));
149      double* recv_buf = static_cast<double*>(recvbuf);
150      double* buffer = comm.my_buffer->buf_double;
151
152      for(int j=0; j<count; j+=BUFFER_SIZE)
153      {
154        if(ep_rank_loc == sender)
155        {
156          #pragma omp critical (write_to_buffer)
157          {
158            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
159          }
160          #pragma omp flush
161        }
162
163        MPI_Barrier_local(comm);
164
165
166        if(ep_rank_loc == receiver)
167        {
168          #pragma omp flush
169          #pragma omp critical (read_from_buffer)
170          {
171            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
172          }
173        }
174
175        MPI_Barrier_local(comm);
176      }
177    }
178    else if(datatype == MPI_LONG)
179    {
180      long* send_buf = static_cast<long*>(const_cast<void*>(sendbuf));
181      long* recv_buf = static_cast<long*>(recvbuf);
182      long* buffer = comm.my_buffer->buf_long;
183
184      for(int j=0; j<count; j+=BUFFER_SIZE)
185      {
186        if(ep_rank_loc == sender)
187        {
188          #pragma omp critical (write_to_buffer)
189          {
190            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
191          }
192          #pragma omp flush
193        }
194
195        MPI_Barrier_local(comm);
196
197
198        if(ep_rank_loc == receiver)
199        {
200          #pragma omp flush
201          #pragma omp critical (read_from_buffer)
202          {
203            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
204          }
205        }
206
207        MPI_Barrier_local(comm);
208      }
209    }
210    else if(datatype == MPI_UNSIGNED_LONG)
211    {
212      unsigned long* send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf));
213      unsigned long* recv_buf = static_cast<unsigned long*>(recvbuf);
214      unsigned long* buffer = comm.my_buffer->buf_ulong;
215
216      for(int j=0; j<count; j+=BUFFER_SIZE)
217      {
218        if(ep_rank_loc == sender)
219        {
220          #pragma omp critical (write_to_buffer)
221          {
222            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
223          }
224          #pragma omp flush
225        }
226
227        MPI_Barrier_local(comm);
228
229
230        if(ep_rank_loc == receiver)
231        {
232          #pragma omp flush
233          #pragma omp critical (read_from_buffer)
234          {
235            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
236          }
237        }
238
239        MPI_Barrier_local(comm);
240      }
241    }
242    else if(datatype == MPI_CHAR)
243    {
244      char* send_buf = static_cast<char*>(const_cast<void*>(sendbuf));
245      char* recv_buf = static_cast<char*>(recvbuf);
246      char* buffer = comm.my_buffer->buf_char;
247
248      for(int j=0; j<count; j+=BUFFER_SIZE)
249      {
250        if(ep_rank_loc == sender)
251        {
252          #pragma omp critical (write_to_buffer)
253          {
254            copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer);
255          }
256          #pragma omp flush
257        }
258
259        MPI_Barrier_local(comm);
260
261
262        if(ep_rank_loc == receiver)
263        {
264          #pragma omp flush
265          #pragma omp critical (read_from_buffer)
266          {
267            copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j);
268          }
269        }
270
271        MPI_Barrier_local(comm);
272      }
273    }
274    else
275    {
276      printf("datatype not supported!!\n");
277      exit(1);
278    }
279    return 0;
280  }
281
282
283  int MPI_Get_count(const MPI_Status *status, MPI_Datatype datatype, int *count)
284  {
285/*
286    ::MPI_Aint datasize, char_size, lb;
287
288    ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize);
289    ::MPI_Type_get_extent(MPI_CHAR, &lb, &char_size);
290
291    *count = status->char_count / ( datasize/ char_size);
292
293    //printf("MPI_Get_count, status_count  = %d\n", *count);
294    return 0;
295*/
296    ::MPI_Status *mpi_status = static_cast< ::MPI_Status* >(status->mpi_status);
297    ::MPI_Datatype mpi_datatype = static_cast< ::MPI_Datatype >(datatype);
298
299    ::MPI_Get_count(mpi_status, mpi_datatype, count);
300  }
301
302  double MPI_Wtime()
303  {
304    return ::MPI_Wtime();
305
306  }
307
308  void check_sum_send(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, int type)
309  {
310    int src_rank;
311    int int_count;
312    ::MPI_Aint datasize, intsize, charsize, lb;
313   
314    ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize);
315    ::MPI_Type_get_extent(MPI_CHAR_STD, &lb, &intsize);
316
317    int_count = count * datasize / intsize ;
318
319    char *buffer = static_cast< char* >(const_cast< void*> (buf));
320   
321    unsigned long sum = 0;
322    for(int i = 0; i<int_count; i++)
323      sum += *(buffer+i); 
324
325
326    MPI_Comm_rank(comm, &src_rank);
327   
328    ofstream myfile;
329    myfile.open ("send_log.txt", ios::app);
330    if (myfile.is_open())
331    {
332      myfile << "type = " << type << " src = "<< src_rank<< " dest = "<< dest <<" tag = "<< tag << "  count = "<< count << " sum = "<< sum << "\n";
333      myfile.close(); 
334    }
335    else printf("Unable to open file\n");
336
337  }
338
339
340  void check_sum_recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, int type)
341  {
342    int dest_rank;
343    int int_count;
344    ::MPI_Aint datasize, intsize, charsize, lb;
345   
346    ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize);
347    ::MPI_Type_get_extent(MPI_CHAR_STD, &lb, &intsize);
348
349    int_count = count * datasize / intsize ;
350
351    char *buffer = static_cast< char* >(buf);
352   
353    unsigned long sum = 0;
354    for(int i = 0; i<int_count; i++)
355      sum += *(buffer+i); 
356
357
358    MPI_Comm_rank(comm, &dest_rank);
359   
360    ofstream myfile;
361    myfile.open ("recv_log.txt", ios::app);
362    if (myfile.is_open())
363    {
364      myfile << "type = " << type << " src = "<< src << " dest = "<< dest_rank <<" tag = "<< tag << "  count = "<< count << " sum = "<< sum << "\n";
365      myfile.close(); 
366    }
367    else printf("Unable to open file\n");
368
369  }
370}
371
372
373
374
375
376
Note: See TracBrowser for help on using the repository browser.