Changeset 1289
- Timestamp:
- 10/04/17 17:02:13 (6 years ago)
- Location:
- XIOS/dev/branch_openmp/extern
- Files:
-
- 25 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/extern/remap/src/mpi_routing.cpp
r1134 r1289 127 127 MPI_Info info_null; 128 128 129 MPI_Alloc_mem(nbTarget *sizeof(int), info_null, &targetRank); 130 MPI_Alloc_mem(nbSource *sizeof(int), info_null, &sourceRank); 129 // MPI_Alloc_mem(nbTarget *sizeof(int), info_null, &targetRank); 130 // MPI_Alloc_mem(nbSource *sizeof(int), info_null, &sourceRank); 131 MPI_Alloc_mem(nbTarget *sizeof(int), MPI_INFO_NULL, &targetRank); 132 MPI_Alloc_mem(nbSource *sizeof(int), MPI_INFO_NULL, &sourceRank); 131 133 132 134 targetRankToIndex = new int[mpiSize]; -
XIOS/dev/branch_openmp/extern/remap/src/polyg.cpp
r1220 r1289 180 180 assert(sc >= -1e-10); // Error: tri a l'env (wrong orientation) 181 181 double area_gc = triarea(t[0], t[1], t[2]); 182 if(area_gc<=0) printf("area_gc = %e\n", area_gc);182 //if(area_gc<=0) printf("area_gc = %e\n", area_gc); 183 183 double area_sc_gc_moon = 0; 184 184 if (d[i]) /* handle small circle case */ -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_allgatherv.cpp
r1287 r1289 62 62 63 63 MPI_Gather_local(&sendcount, 1, MPI_INT, local_recvcounts.data(), 0, comm); 64 for(int i=1; i<num_ep; i++) local_displs[i] = local_displs[i-1] + local_recvcounts[i ];64 for(int i=1; i<num_ep; i++) local_displs[i] = local_displs[i-1] + local_recvcounts[i-1]; 65 65 66 66 67 67 if(is_master) 68 68 { 69 local_recvbuf = new void*[datasize * num_ep * count];69 local_recvbuf = new void*[datasize * std::accumulate(local_recvcounts.begin(), local_recvcounts.end(), 0)]; 70 70 tmp_recvbuf = new void*[datasize * std::accumulate(recvcounts, recvcounts+ep_size, 0)]; 71 71 } … … 81 81 std::vector<int>mpi_displs(mpi_size, 0); 82 82 83 int local_sendcount = num_ep * count;83 int local_sendcount = std::accumulate(local_recvcounts.begin(), local_recvcounts.end(), 0); 84 84 MPI_Allgather(&local_sendcount, 1, MPI_INT, mpi_recvcounts.data(), 1, MPI_INT, to_mpi_comm(comm.mpi_comm)); 85 85 … … 99 99 100 100 101 ::MPI_Allgatherv(local_recvbuf, num_ep *count, to_mpi_type(datatype), tmp_recvbuf, mpi_recvcounts.data(), mpi_displs.data(), to_mpi_type(datatype), to_mpi_comm(comm.mpi_comm));101 ::MPI_Allgatherv(local_recvbuf, local_sendcount, to_mpi_type(datatype), tmp_recvbuf, mpi_recvcounts.data(), mpi_displs.data(), to_mpi_type(datatype), to_mpi_comm(comm.mpi_comm)); 102 102 103 103 // if(ep_rank == 0) -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_alltoall.cpp
r1287 r1289 1 1 #include "ep_lib.hpp" 2 2 #include <mpi.h> 3 #include "ep_mpi.hpp" 4 3 //#include "ep_declaration.hpp" 5 4 6 5 namespace ep_lib 7 6 { 8 7 8 9 9 10 int MPI_Alltoall(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) 10 11 { 11 if(!comm.is_ep) 12 { 13 return ::MPI_Alltoall(sendbuf, sendcount, to_mpi_type(sendtype), recvbuf, recvcount, to_mpi_type(recvtype), to_mpi_comm(comm.mpi_comm)); 14 } 12 assert(static_cast< ::MPI_Datatype>(sendtype) == static_cast< ::MPI_Datatype>(recvtype)); 13 ::MPI_Aint typesize, llb; 14 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(sendtype), &llb, &typesize); 15 16 int ep_size; 17 MPI_Comm_size(comm, &ep_size); 18 15 19 16 17 assert(valid_type(sendtype) && valid_type(recvtype));18 assert(sendcount == recvcount);19 20 ::MPI_Aint datasize, llb;21 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(sendtype), &llb, &datasize);22 23 int count = sendcount;24 25 int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first;26 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first;27 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first;28 int ep_size = comm.ep_comm_ptr->size_rank_info[0].second;29 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second;30 int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second;31 32 void* tmp_recvbuf;33 if(ep_rank == 0) tmp_recvbuf = new void*[count * ep_size * ep_size * datasize];34 35 MPI_Gather(sendbuf, count*ep_size, sendtype, tmp_recvbuf, count*ep_size, recvtype, 0, comm);36 37 38 39 // reorder tmp_buf40 void* tmp_sendbuf;41 if(ep_rank == 0) tmp_sendbuf = new void*[count * ep_size * ep_size * datasize];42 43 if(ep_rank == 0)44 20 for(int i=0; i<ep_size; i++) 45 21 { 46 for(int j=0; j<ep_size; j++) 47 { 48 //printf("tmp_recv[%d] = tmp_send[%d]\n", i*ep_size*count + j*count, j*ep_size*count + i*count); 49 50 memcpy(tmp_sendbuf + j*ep_size*count*datasize + i*count*datasize, tmp_recvbuf + i*ep_size*count*datasize + j*count*datasize, count*datasize); 51 } 22 ep_lib::MPI_Gather(sendbuf+i*sendcount*typesize, sendcount, sendtype, recvbuf, recvcount, recvtype, i, comm); 52 23 } 53 54 MPI_Scatter(tmp_sendbuf, ep_size*count, sendtype, recvbuf, ep_size*recvcount, recvtype, 0, comm); 55 56 if(ep_rank == 0) 57 { 58 delete[] tmp_recvbuf; 59 delete[] tmp_sendbuf; 60 } 24 61 25 62 26 return 0; -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_bcast.cpp
r1287 r1289 11 11 #include "ep_mpi.hpp" 12 12 13 13 14 using namespace std; 15 14 16 15 17 namespace ep_lib 16 18 { 19 20 int MPI_Bcast_local(void *buffer, int count, MPI_Datatype datatype, int local_root, MPI_Comm comm) 21 { 22 assert(valid_type(datatype)); 23 24 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 25 26 ::MPI_Aint datasize, lb; 27 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 28 29 30 if(ep_rank_loc == local_root) 31 { 32 //comm.ep_comm_ptr->comm_list->collective_buffer[local_root] = buffer; 33 comm.my_buffer->void_buffer[local_root] = buffer; 34 } 35 36 // #pragma omp flush 37 MPI_Barrier_local(comm); 38 // #pragma omp flush 39 40 if(ep_rank_loc != local_root) 41 { 42 #pragma omp critical (_bcast) 43 memcpy(buffer, comm.my_buffer->void_buffer[local_root], datasize * count); 44 //memcpy(buffer, comm.ep_comm_ptr->comm_list->collective_buffer[local_root], datasize * count); 45 } 46 47 MPI_Barrier_local(comm); 48 } 17 49 18 50 int MPI_Bcast(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm) … … 49 81 50 82 51 int MPI_Bcast_local(void *buffer, int count, MPI_Datatype datatype, int local_root, MPI_Comm comm) 52 { 53 assert(valid_type(datatype)); 54 55 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 56 57 ::MPI_Aint datasize, lb; 58 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 59 60 61 if(ep_rank_loc == local_root) 62 { 63 comm.my_buffer->void_buffer[local_root] = buffer; 64 } 65 66 // #pragma omp flush 67 MPI_Barrier_local(comm); 68 // #pragma omp flush 69 70 if(ep_rank_loc != local_root) 71 { 72 #pragma omp critical (_bcast) 73 memcpy(buffer, comm.my_buffer->void_buffer[local_root], datasize * count); 74 } 75 76 MPI_Barrier_local(comm); 77 } 83 84 85 86 int MPI_Bcast_local2(void *buffer, int count, MPI_Datatype datatype, MPI_Comm comm) 87 { 88 if(datatype == MPI_INT) 89 { 90 return MPI_Bcast_local_int(buffer, count, comm); 91 } 92 else if(datatype == MPI_FLOAT) 93 { 94 return MPI_Bcast_local_float(buffer, count, comm); 95 } 96 else if(datatype == MPI_DOUBLE) 97 { 98 return MPI_Bcast_local_double(buffer, count, comm); 99 } 100 else if(datatype == MPI_CHAR) 101 { 102 return MPI_Bcast_local_char(buffer, count, comm); 103 } 104 else if(datatype == MPI_LONG) 105 { 106 return MPI_Bcast_local_long(buffer, count, comm); 107 } 108 else if(datatype == MPI_UNSIGNED_LONG) 109 { 110 return MPI_Bcast_local_char(buffer, count, comm); 111 } 112 else 113 { 114 printf("MPI_Bcast Datatype not supported!\n"); 115 exit(0); 116 } 117 } 118 119 int MPI_Bcast_local_int(void *buf, int count, MPI_Comm comm) 120 { 121 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 122 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 123 124 int *buffer = comm.my_buffer->buf_int; 125 int *tmp = static_cast<int*>(buf); 126 127 for(int j=0; j<count; j+=BUFFER_SIZE) 128 { 129 if(my_rank == 0) 130 { 131 #pragma omp critical (write_to_buffer) 132 { 133 copy(tmp+j, tmp+j+min(BUFFER_SIZE, count-j), buffer); 134 } 135 #pragma omp flush 136 } 137 138 MPI_Barrier_local(comm); 139 140 141 142 if(my_rank != 0) 143 { 144 #pragma omp flush 145 #pragma omp critical (read_from_buffer) 146 { 147 copy(buffer, buffer+min(BUFFER_SIZE, count-j), tmp+j); 148 } 149 } 150 151 MPI_Barrier_local(comm); 152 } 153 } 154 155 int MPI_Bcast_local_float(void *buf, int count, MPI_Comm comm) 156 { 157 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 158 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 159 160 float *buffer = comm.my_buffer->buf_float; 161 float *tmp = static_cast<float*>(buf); 162 163 for(int j=0; j<count; j+=BUFFER_SIZE) 164 { 165 if(my_rank == 0) 166 { 167 #pragma omp critical (write_to_buffer) 168 { 169 copy(tmp+j, tmp+j+min(BUFFER_SIZE, count-j), buffer); 170 } 171 #pragma omp flush 172 } 173 174 MPI_Barrier_local(comm); 175 176 177 if(my_rank != 0) 178 { 179 #pragma omp flush 180 #pragma omp critical (read_from_buffer) 181 { 182 copy(buffer, buffer+min(BUFFER_SIZE, count-j), tmp+j); 183 } 184 } 185 186 MPI_Barrier_local(comm); 187 } 188 } 189 190 int MPI_Bcast_local_double(void *buf, int count, MPI_Comm comm) 191 { 192 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 193 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 194 195 double *buffer = comm.my_buffer->buf_double; 196 double *tmp = static_cast<double*>(buf); 197 198 for(int j=0; j<count; j+=BUFFER_SIZE) 199 { 200 if(my_rank == 0) 201 { 202 #pragma omp critical (write_to_buffer) 203 { 204 copy(tmp+j, tmp+j+min(BUFFER_SIZE, count-j), buffer); 205 } 206 #pragma omp flush 207 } 208 209 MPI_Barrier_local(comm); 210 211 212 if(my_rank != 0) 213 { 214 #pragma omp flush 215 #pragma omp critical (read_from_buffer) 216 { 217 copy(buffer, buffer+min(BUFFER_SIZE, count-j), tmp+j); 218 } 219 } 220 221 MPI_Barrier_local(comm); 222 } 223 } 224 225 226 int MPI_Bcast_local_char(void *buf, int count, MPI_Comm comm) 227 { 228 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 229 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 230 231 char *buffer = comm.my_buffer->buf_char; 232 char *tmp = static_cast<char*>(buf); 233 234 for(int j=0; j<count; j+=BUFFER_SIZE) 235 { 236 if(my_rank == 0) 237 { 238 #pragma omp critical (write_to_buffer) 239 { 240 copy(tmp+j, tmp+j+min(BUFFER_SIZE, count-j), buffer); 241 } 242 #pragma omp flush 243 } 244 245 MPI_Barrier_local(comm); 246 247 248 if(my_rank != 0) 249 { 250 #pragma omp flush 251 #pragma omp critical (read_from_buffer) 252 { 253 copy(buffer, buffer+min(BUFFER_SIZE, count-j), tmp+j); 254 } 255 } 256 257 MPI_Barrier_local(comm); 258 } 259 } 260 261 int MPI_Bcast_local_long(void *buf, int count, MPI_Comm comm) 262 { 263 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 264 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 265 266 long *buffer = comm.my_buffer->buf_long; 267 long *tmp = static_cast<long*>(buf); 268 269 for(int j=0; j<count; j+=BUFFER_SIZE) 270 { 271 if(my_rank == 0) 272 { 273 #pragma omp critical (write_to_buffer) 274 { 275 copy(tmp+j, tmp+j+min(BUFFER_SIZE, count-j), buffer); 276 } 277 #pragma omp flush 278 } 279 280 MPI_Barrier_local(comm); 281 282 283 if(my_rank != 0) 284 { 285 #pragma omp flush 286 #pragma omp critical (read_from_buffer) 287 { 288 copy(buffer, buffer+min(BUFFER_SIZE, count-j), tmp+j); 289 } 290 } 291 292 MPI_Barrier_local(comm); 293 } 294 } 295 296 int MPI_Bcast_local_ulong(void *buf, int count, MPI_Comm comm) 297 { 298 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 299 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 300 301 unsigned long *buffer = comm.my_buffer->buf_ulong; 302 unsigned long *tmp = static_cast<unsigned long*>(buf); 303 304 for(int j=0; j<count; j+=BUFFER_SIZE) 305 { 306 if(my_rank == 0) 307 { 308 #pragma omp critical (write_to_buffer) 309 { 310 copy(tmp+j, tmp+j+min(BUFFER_SIZE, count-j), buffer); 311 } 312 #pragma omp flush 313 } 314 315 MPI_Barrier_local(comm); 316 317 318 if(my_rank != 0) 319 { 320 #pragma omp flush 321 #pragma omp critical (read_from_buffer) 322 { 323 copy(buffer, buffer+min(BUFFER_SIZE, count-j), tmp+j); 324 } 325 } 326 327 MPI_Barrier_local(comm); 328 } 329 } 330 331 332 int MPI_Bcast2(void *buffer, int count, MPI_Datatype datatype, int root, MPI_Comm comm) 333 { 334 335 if(!comm.is_ep) 336 { 337 ::MPI_Bcast(buffer, count, static_cast< ::MPI_Datatype>(datatype), root, static_cast< ::MPI_Comm>(comm.mpi_comm)); 338 return 0; 339 } 340 341 342 int ep_rank, ep_rank_loc, mpi_rank; 343 int ep_size, num_ep, mpi_size; 344 345 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 346 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 347 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 348 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 349 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 350 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 351 352 353 354 int root_mpi_rank = comm.rank_map->at(root).second; 355 int root_ep_rank_loc = comm.rank_map->at(root).first; 356 357 358 // if root is not master thread, send first to master 359 if(root_ep_rank_loc != 0 && mpi_rank == root_mpi_rank) 360 { 361 innode_memcpy(root_ep_rank_loc, buffer, 0, buffer, count, datatype, comm); 362 } 363 364 365 if(ep_rank_loc==0) 366 { 367 ::MPI_Bcast(buffer, count, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 368 } 369 370 MPI_Bcast_local2(buffer, count, datatype, comm); 371 372 return 0; 373 } 374 78 375 79 376 } -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_create.cpp
r1287 r1289 63 63 64 64 out_comm_hdls[0].my_buffer = new BUFFER; 65 //out_comm_hdls[0].my_buffer->buf_double = new double[BUFFER_SIZE];66 //out_comm_hdls[0].my_buffer->buf_float = new float[BUFFER_SIZE];67 //out_comm_hdls[0].my_buffer->buf_int = new int[BUFFER_SIZE];68 //out_comm_hdls[0].my_buffer->buf_long = new long[BUFFER_SIZE];69 //out_comm_hdls[0].my_buffer->buf_ulong = new unsigned long[BUFFER_SIZE];70 //out_comm_hdls[0].my_buffer->buf_char = new char[BUFFER_SIZE];65 out_comm_hdls[0].my_buffer->buf_double = new double[BUFFER_SIZE]; 66 out_comm_hdls[0].my_buffer->buf_float = new float[BUFFER_SIZE]; 67 out_comm_hdls[0].my_buffer->buf_int = new int[BUFFER_SIZE]; 68 out_comm_hdls[0].my_buffer->buf_long = new long[BUFFER_SIZE]; 69 out_comm_hdls[0].my_buffer->buf_ulong = new unsigned long[BUFFER_SIZE]; 70 out_comm_hdls[0].my_buffer->buf_char = new char[BUFFER_SIZE]; 71 71 72 72 out_comm_hdls[0].rank_map = new RANK_MAP; … … 145 145 146 146 out_comm_hdls[0].my_buffer = new BUFFER; 147 //out_comm_hdls[0].my_buffer->buf_double = new double[BUFFER_SIZE];148 //out_comm_hdls[0].my_buffer->buf_float = new float[BUFFER_SIZE];149 //out_comm_hdls[0].my_buffer->buf_int = new int[BUFFER_SIZE];150 //out_comm_hdls[0].my_buffer->buf_long = new long[BUFFER_SIZE];151 //out_comm_hdls[0].my_buffer->buf_ulong = new unsigned long[BUFFER_SIZE];152 //out_comm_hdls[0].my_buffer->buf_char = new char[BUFFER_SIZE];147 out_comm_hdls[0].my_buffer->buf_double = new double[BUFFER_SIZE]; 148 out_comm_hdls[0].my_buffer->buf_float = new float[BUFFER_SIZE]; 149 out_comm_hdls[0].my_buffer->buf_int = new int[BUFFER_SIZE]; 150 out_comm_hdls[0].my_buffer->buf_long = new long[BUFFER_SIZE]; 151 out_comm_hdls[0].my_buffer->buf_ulong = new unsigned long[BUFFER_SIZE]; 152 out_comm_hdls[0].my_buffer->buf_char = new char[BUFFER_SIZE]; 153 153 154 154 out_comm_hdls[0].rank_map = new RANK_MAP; … … 233 233 234 234 out_comm_hdls[0].my_buffer = new BUFFER; 235 //out_comm_hdls[0].my_buffer->buf_double = new double[BUFFER_SIZE];236 //out_comm_hdls[0].my_buffer->buf_float = new float[BUFFER_SIZE];237 //out_comm_hdls[0].my_buffer->buf_int = new int[BUFFER_SIZE];238 //out_comm_hdls[0].my_buffer->buf_long = new long[BUFFER_SIZE];239 //out_comm_hdls[0].my_buffer->buf_ulong = new unsigned long[BUFFER_SIZE];240 //out_comm_hdls[0].my_buffer->buf_char = new char[BUFFER_SIZE];235 out_comm_hdls[0].my_buffer->buf_double = new double[BUFFER_SIZE]; 236 out_comm_hdls[0].my_buffer->buf_float = new float[BUFFER_SIZE]; 237 out_comm_hdls[0].my_buffer->buf_int = new int[BUFFER_SIZE]; 238 out_comm_hdls[0].my_buffer->buf_long = new long[BUFFER_SIZE]; 239 out_comm_hdls[0].my_buffer->buf_ulong = new unsigned long[BUFFER_SIZE]; 240 out_comm_hdls[0].my_buffer->buf_char = new char[BUFFER_SIZE]; 241 241 242 242 out_comm_hdls[0].rank_map = new RANK_MAP; -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_exscan.cpp
r1287 r1289 9 9 #include <mpi.h> 10 10 #include "ep_declaration.hpp" 11 #include "ep_mpi.hpp"12 11 13 12 using namespace std; … … 27 26 } 28 27 29 template<typename T> 30 void reduce_max(const T * buffer, T* recvbuf, int count) 31 { 32 transform(buffer, buffer+count, recvbuf, recvbuf, max_op<T>); 33 } 34 35 template<typename T> 36 void reduce_min(const T * buffer, T* recvbuf, int count) 37 { 38 transform(buffer, buffer+count, recvbuf, recvbuf, min_op<T>); 39 } 40 41 template<typename T> 42 void reduce_sum(const T * buffer, T* recvbuf, int count) 43 { 44 transform(buffer, buffer+count, recvbuf, recvbuf, std::plus<T>()); 45 } 46 47 48 int MPI_Exscan_local(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 49 { 50 valid_op(op); 51 52 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 53 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 54 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 28 int MPI_Exscan_local2(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 29 { 30 if(datatype == MPI_INT) 31 { 32 return MPI_Exscan_local_int(sendbuf, recvbuf, count, op, comm); 33 } 34 else if(datatype == MPI_FLOAT) 35 { 36 return MPI_Exscan_local_float(sendbuf, recvbuf, count, op, comm); 37 } 38 else if(datatype == MPI_DOUBLE) 39 { 40 return MPI_Exscan_local_double(sendbuf, recvbuf, count, op, comm); 41 } 42 else if(datatype == MPI_LONG) 43 { 44 return MPI_Exscan_local_long(sendbuf, recvbuf, count, op, comm); 45 } 46 else if(datatype == MPI_UNSIGNED_LONG) 47 { 48 return MPI_Exscan_local_ulong(sendbuf, recvbuf, count, op, comm); 49 } 50 else if(datatype == MPI_CHAR) 51 { 52 return MPI_Exscan_local_char(sendbuf, recvbuf, count, op, comm); 53 } 54 else 55 { 56 printf("MPI_Exscan Datatype not supported!\n"); 57 exit(0); 58 } 59 } 60 61 62 63 64 int MPI_Exscan_local_int(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 65 { 66 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 67 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 68 69 int *buffer = comm.ep_comm_ptr->comm_list->my_buffer->buf_int; 70 int *send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 71 int *recv_buf = static_cast<int*>(recvbuf); 72 73 for(int j=0; j<count; j+=BUFFER_SIZE) 74 { 75 76 if(my_rank == 0) 77 { 78 79 #pragma omp critical (write_to_buffer) 80 { 81 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 82 fill(recv_buf+j, recv_buf+j+min(BUFFER_SIZE, count-j), MPI_UNDEFINED); 83 #pragma omp flush 84 } 85 } 86 87 MPI_Barrier_local(comm); 88 89 for(int k=1; k<num_ep; k++) 90 { 91 #pragma omp critical (write_to_buffer) 92 { 93 if(my_rank == k) 94 { 95 #pragma omp flush 96 if(op == MPI_SUM) 97 { 98 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 99 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<int>()); 100 101 } 102 else if(op == MPI_MAX) 103 { 104 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 105 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<int>); 106 } 107 else if(op == MPI_MIN) 108 { 109 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 110 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<int>); 111 } 112 else 113 { 114 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 115 exit(1); 116 } 117 #pragma omp flush 118 } 119 } 120 121 MPI_Barrier_local(comm); 122 } 123 } 124 125 } 126 127 int MPI_Exscan_local_float(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 128 { 129 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 130 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 131 132 float *buffer = comm.ep_comm_ptr->comm_list->my_buffer->buf_float; 133 float *send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 134 float *recv_buf = static_cast<float*>(recvbuf); 135 136 for(int j=0; j<count; j+=BUFFER_SIZE) 137 { 138 if(my_rank == 0) 139 { 140 141 #pragma omp critical (write_to_buffer) 142 { 143 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 144 fill(recv_buf+j, recv_buf+j+min(BUFFER_SIZE, count-j), MPI_UNDEFINED); 145 #pragma omp flush 146 } 147 } 148 149 MPI_Barrier_local(comm); 150 151 for(int k=1; k<num_ep; k++) 152 { 153 #pragma omp critical (write_to_buffer) 154 { 155 if(my_rank == k) 156 { 157 #pragma omp flush 158 if(op == MPI_SUM) 159 { 160 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 161 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<float>()); 162 } 163 else if(op == MPI_MAX) 164 { 165 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 166 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<float>); 167 } 168 else if(op == MPI_MIN) 169 { 170 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 171 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<float>); 172 } 173 else 174 { 175 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 176 exit(1); 177 } 178 #pragma omp flush 179 } 180 } 181 182 MPI_Barrier_local(comm); 183 } 184 } 185 } 186 187 int MPI_Exscan_local_double(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 188 { 189 190 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 191 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 192 193 double *buffer = comm.ep_comm_ptr->comm_list->my_buffer->buf_double; 194 double *send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 195 double *recv_buf = static_cast<double*>(recvbuf); 196 197 for(int j=0; j<count; j+=BUFFER_SIZE) 198 { 199 if(my_rank == 0) 200 { 201 202 #pragma omp critical (write_to_buffer) 203 { 204 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 205 fill(recv_buf+j, recv_buf+j+min(BUFFER_SIZE, count-j), MPI_UNDEFINED); 206 #pragma omp flush 207 } 208 } 209 210 MPI_Barrier_local(comm); 211 212 for(int k=1; k<num_ep; k++) 213 { 214 #pragma omp critical (write_to_buffer) 215 { 216 if(my_rank == k) 217 { 218 #pragma omp flush 219 if(op == MPI_SUM) 220 { 221 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 222 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<double>()); 223 } 224 else if(op == MPI_MAX) 225 { 226 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 227 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<double>); 228 } 229 else if(op == MPI_MIN) 230 { 231 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 232 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<double>); 233 } 234 else 235 { 236 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 237 exit(1); 238 } 239 #pragma omp flush 240 } 241 } 242 243 MPI_Barrier_local(comm); 244 } 245 } 246 } 247 248 int MPI_Exscan_local_long(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 249 { 250 251 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 252 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 253 254 long *buffer = comm.ep_comm_ptr->comm_list->my_buffer->buf_long; 255 long *send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 256 long *recv_buf = static_cast<long*>(recvbuf); 257 258 for(int j=0; j<count; j+=BUFFER_SIZE) 259 { 260 if(my_rank == 0) 261 { 262 263 #pragma omp critical (write_to_buffer) 264 { 265 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 266 fill(recv_buf+j, recv_buf+j+min(BUFFER_SIZE, count-j), MPI_UNDEFINED); 267 #pragma omp flush 268 } 269 } 270 271 MPI_Barrier_local(comm); 272 273 for(int k=1; k<num_ep; k++) 274 { 275 #pragma omp critical (write_to_buffer) 276 { 277 if(my_rank == k) 278 { 279 #pragma omp flush 280 if(op == MPI_SUM) 281 { 282 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 283 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<long>()); 284 } 285 else if(op == MPI_MAX) 286 { 287 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 288 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<long>); 289 } 290 else if(op == MPI_MIN) 291 { 292 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 293 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<long>); 294 } 295 else 296 { 297 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 298 exit(1); 299 } 300 #pragma omp flush 301 } 302 } 303 304 MPI_Barrier_local(comm); 305 } 306 } 307 } 308 309 int MPI_Exscan_local_ulong(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 310 { 311 312 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 313 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 314 315 unsigned long *buffer = comm.ep_comm_ptr->comm_list->my_buffer->buf_ulong; 316 unsigned long *send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 317 unsigned long *recv_buf = static_cast<unsigned long*>(recvbuf); 318 319 for(int j=0; j<count; j+=BUFFER_SIZE) 320 { 321 if(my_rank == 0) 322 { 323 324 #pragma omp critical (write_to_buffer) 325 { 326 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 327 fill(recv_buf+j, recv_buf+j+min(BUFFER_SIZE, count-j), MPI_UNDEFINED); 328 #pragma omp flush 329 } 330 } 331 332 MPI_Barrier_local(comm); 333 334 for(int k=1; k<num_ep; k++) 335 { 336 #pragma omp critical (write_to_buffer) 337 { 338 if(my_rank == k) 339 { 340 #pragma omp flush 341 if(op == MPI_SUM) 342 { 343 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 344 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<unsigned long>()); 345 } 346 else if(op == MPI_MAX) 347 { 348 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 349 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<unsigned long>); 350 } 351 else if(op == MPI_MIN) 352 { 353 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 354 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<unsigned long>); 355 } 356 else 357 { 358 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 359 exit(1); 360 } 361 #pragma omp flush 362 } 363 } 364 365 MPI_Barrier_local(comm); 366 } 367 } 368 } 369 370 int MPI_Exscan_local_char(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 371 { 372 373 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 374 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 375 376 char *buffer = comm.ep_comm_ptr->comm_list->my_buffer->buf_char; 377 char *send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 378 char *recv_buf = static_cast<char*>(recvbuf); 379 380 for(int j=0; j<count; j+=BUFFER_SIZE) 381 { 382 if(my_rank == 0) 383 { 384 385 #pragma omp critical (write_to_buffer) 386 { 387 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 388 fill(recv_buf+j, recv_buf+j+min(BUFFER_SIZE, count-j), MPI_UNDEFINED); 389 #pragma omp flush 390 } 391 } 392 393 MPI_Barrier_local(comm); 394 395 for(int k=1; k<num_ep; k++) 396 { 397 #pragma omp critical (write_to_buffer) 398 { 399 if(my_rank == k) 400 { 401 #pragma omp flush 402 if(op == MPI_SUM) 403 { 404 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 405 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<char>()); 406 } 407 else if(op == MPI_MAX) 408 { 409 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 410 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<char>); 411 } 412 else if(op == MPI_MIN) 413 { 414 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 415 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<char>); 416 } 417 else 418 { 419 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 420 exit(1); 421 } 422 #pragma omp flush 423 } 424 } 425 426 MPI_Barrier_local(comm); 427 } 428 } 429 } 430 431 432 int MPI_Exscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 433 { 434 435 if(!comm.is_ep) 436 { 437 ::MPI_Exscan(const_cast<void*>(sendbuf), recvbuf, count, static_cast< ::MPI_Datatype>(datatype), 438 static_cast< ::MPI_Op>(op), static_cast< ::MPI_Comm>(comm.mpi_comm)); 439 return 0; 440 } 441 if(!comm.mpi_comm) return 0; 442 443 int ep_rank, ep_rank_loc, mpi_rank; 444 int ep_size, num_ep, mpi_size; 445 446 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 447 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 448 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 449 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 450 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 451 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 452 453 454 455 ::MPI_Aint datasize, lb; 55 456 56 57 ::MPI_Aint datasize, lb; 58 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 59 60 if(ep_rank_loc == 0 && mpi_rank != 0) 61 { 62 comm.my_buffer->void_buffer[0] = recvbuf; 63 } 64 if(ep_rank_loc == 0 && mpi_rank == 0) 65 { 66 comm.my_buffer->void_buffer[0] = const_cast<void*>(sendbuf); 67 } 68 69 70 MPI_Barrier_local(comm); 71 72 memcpy(recvbuf, comm.my_buffer->void_buffer[0], datasize*count); 73 74 MPI_Barrier_local(comm); 75 76 comm.my_buffer->void_buffer[ep_rank_loc] = const_cast<void*>(sendbuf); 77 78 MPI_Barrier_local(comm); 79 80 if(op == MPI_SUM) 81 { 82 if(datatype == MPI_INT && datasize == sizeof(int)) 83 { 84 for(int i=0; i<ep_rank_loc; i++) 85 reduce_sum<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 86 } 87 88 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 89 { 90 for(int i=0; i<ep_rank_loc; i++) 91 reduce_sum<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 92 } 93 94 95 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 96 { 97 for(int i=0; i<ep_rank_loc; i++) 98 reduce_sum<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 99 } 100 101 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 102 { 103 for(int i=0; i<ep_rank_loc; i++) 104 reduce_sum<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 105 } 106 107 else if(datatype == MPI_LONG && datasize == sizeof(long)) 108 { 109 for(int i=0; i<ep_rank_loc; i++) 110 reduce_sum<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 111 } 112 113 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 114 { 115 for(int i=0; i<ep_rank_loc; i++) 116 reduce_sum<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 117 } 118 119 else printf("datatype Error\n"); 120 121 122 } 123 124 else if(op == MPI_MAX) 125 { 126 if(datatype == MPI_INT && datasize == sizeof(int)) 127 for(int i=0; i<ep_rank_loc; i++) 128 reduce_max<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 129 130 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 131 for(int i=0; i<ep_rank_loc; i++) 132 reduce_max<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 133 134 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 135 for(int i=0; i<ep_rank_loc; i++) 136 reduce_max<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 137 138 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 139 for(int i=0; i<ep_rank_loc; i++) 140 reduce_max<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 141 142 else if(datatype == MPI_LONG && datasize == sizeof(long)) 143 for(int i=0; i<ep_rank_loc; i++) 144 reduce_max<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 145 146 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 147 for(int i=0; i<ep_rank_loc; i++) 148 reduce_max<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 149 150 else printf("datatype Error\n"); 151 } 152 153 else //if(op == MPI_MIN) 154 { 155 if(datatype == MPI_INT && datasize == sizeof(int)) 156 for(int i=0; i<ep_rank_loc; i++) 157 reduce_min<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 158 159 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 160 for(int i=0; i<ep_rank_loc; i++) 161 reduce_min<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 162 163 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 164 for(int i=0; i<ep_rank_loc; i++) 165 reduce_min<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 166 167 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 168 for(int i=0; i<ep_rank_loc; i++) 169 reduce_min<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 170 171 else if(datatype == MPI_LONG && datasize == sizeof(long)) 172 for(int i=0; i<ep_rank_loc; i++) 173 reduce_min<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 174 175 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 176 for(int i=0; i<ep_rank_loc; i++) 177 reduce_min<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 178 179 else printf("datatype Error\n"); 180 } 181 182 MPI_Barrier_local(comm); 183 184 } 185 186 int MPI_Exscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 187 { 188 if(!comm.is_ep) 189 { 190 return ::MPI_Scan(sendbuf, recvbuf, count, to_mpi_type(datatype), to_mpi_op(op), to_mpi_comm(comm.mpi_comm)); 191 } 192 193 valid_type(datatype); 194 195 int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 196 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 197 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 198 int ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 199 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 200 int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 201 202 ::MPI_Aint datasize, lb; 203 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 204 205 void* tmp_sendbuf; 206 tmp_sendbuf = new void*[datasize * count]; 207 208 int my_src = 0; 209 int my_dst = ep_rank; 210 211 std::vector<int> my_map(mpi_size, 0); 212 213 for(int i=0; i<comm.rank_map->size(); i++) my_map[comm.rank_map->at(i).second]++; 214 215 for(int i=0; i<mpi_rank; i++) my_src += my_map[i]; 216 my_src += ep_rank_loc; 217 218 219 for(int i=0; i<mpi_size; i++) 220 { 221 if(my_dst < my_map[i]) 222 { 223 my_dst = get_ep_rank(comm, my_dst, i); 224 break; 225 } 226 else 227 my_dst -= my_map[i]; 228 } 229 230 if(ep_rank != my_dst) 231 { 232 MPI_Request request[2]; 233 MPI_Status status[2]; 234 235 MPI_Isend(sendbuf, count, datatype, my_dst, my_dst, comm, &request[0]); 236 237 MPI_Irecv(tmp_sendbuf, count, datatype, my_src, ep_rank, comm, &request[1]); 238 239 MPI_Waitall(2, request, status); 240 } 241 242 else memcpy(tmp_sendbuf, sendbuf, datasize*count); 243 244 245 void* tmp_recvbuf; 246 tmp_recvbuf = new void*[datasize * count]; 247 248 MPI_Reduce_local(tmp_sendbuf, tmp_recvbuf, count, datatype, op, 0, comm); 457 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 458 459 void* local_scan_recvbuf; 460 local_scan_recvbuf = new void*[datasize * count]; 461 462 463 // local scan 464 MPI_Exscan_local2(sendbuf, recvbuf, count, datatype, op, comm); 465 466 // MPI_scan 467 void* local_sum; 468 void* mpi_scan_recvbuf; 469 470 471 mpi_scan_recvbuf = new void*[datasize*count]; 249 472 250 473 if(ep_rank_loc == 0) 251 ::MPI_Exscan(MPI_IN_PLACE, tmp_recvbuf, count, to_mpi_type(datatype), to_mpi_op(op), to_mpi_comm(comm.mpi_comm)); 252 253 // printf(" ID=%d : %d %d \n", ep_rank, static_cast<int*>(tmp_recvbuf)[0], static_cast<int*>(tmp_recvbuf)[1]); 254 255 MPI_Exscan_local(tmp_sendbuf, tmp_recvbuf, count, datatype, op, comm); 256 257 // printf(" ID=%d : after local tmp_sendbuf = %d %d ; tmp_recvbuf = %d %d \n", ep_rank, static_cast<int*>(tmp_sendbuf)[0], static_cast<int*>(tmp_sendbuf)[1], static_cast<int*>(tmp_recvbuf)[0], static_cast<int*>(tmp_recvbuf)[1]); 258 259 260 261 if(ep_rank != my_src) 262 { 263 MPI_Request request[2]; 264 MPI_Status status[2]; 265 266 MPI_Isend(tmp_recvbuf, count, datatype, my_src, my_src, comm, &request[0]); 267 268 MPI_Irecv(recvbuf, count, datatype, my_dst, ep_rank, comm, &request[1]); 269 270 MPI_Waitall(2, request, status); 271 } 272 273 else memcpy(recvbuf, tmp_recvbuf, datasize*count); 274 275 276 277 278 delete[] tmp_sendbuf; 279 delete[] tmp_recvbuf; 280 281 } 474 { 475 local_sum = new void*[datasize*count]; 476 } 477 478 479 MPI_Reduce_local2(sendbuf, local_sum, count, datatype, op, comm); 480 481 if(ep_rank_loc == 0) 482 { 483 ::MPI_Exscan(local_sum, mpi_scan_recvbuf, count, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), static_cast< ::MPI_Comm>(comm.mpi_comm)); 484 } 485 486 487 if(mpi_rank > 0) 488 { 489 MPI_Bcast_local2(mpi_scan_recvbuf, count, datatype, comm); 490 } 491 492 493 if(datatype == MPI_DOUBLE) 494 { 495 double* sum_buf = static_cast<double*>(mpi_scan_recvbuf); 496 double* recv_buf = static_cast<double*>(recvbuf); 497 498 if(mpi_rank != 0) 499 { 500 if(op == MPI_SUM) 501 { 502 if(ep_rank_loc == 0) 503 { 504 copy(sum_buf, sum_buf+count, recv_buf); 505 } 506 else 507 { 508 for(int i=0; i<count; i++) 509 { 510 recv_buf[i] += sum_buf[i]; 511 } 512 } 513 } 514 else if (op == MPI_MAX) 515 { 516 if(ep_rank_loc == 0) 517 { 518 copy(sum_buf, sum_buf+count, recv_buf); 519 } 520 else 521 { 522 for(int i=0; i<count; i++) 523 { 524 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 525 } 526 } 527 } 528 else if(op == MPI_MIN) 529 { 530 if(ep_rank_loc == 0) 531 { 532 copy(sum_buf, sum_buf+count, recv_buf); 533 } 534 else 535 { 536 for(int i=0; i<count; i++) 537 { 538 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 539 } 540 } 541 } 542 else 543 { 544 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 545 exit(1); 546 } 547 } 548 549 delete[] static_cast<double*>(mpi_scan_recvbuf); 550 if(ep_rank_loc == 0) 551 { 552 delete[] static_cast<double*>(local_sum); 553 } 554 } 555 556 else if(datatype == MPI_FLOAT) 557 { 558 float* sum_buf = static_cast<float*>(mpi_scan_recvbuf); 559 float* recv_buf = static_cast<float*>(recvbuf); 560 561 if(mpi_rank != 0) 562 { 563 if(op == MPI_SUM) 564 { 565 if(ep_rank_loc == 0) 566 { 567 copy(sum_buf, sum_buf+count, recv_buf); 568 } 569 else 570 { 571 for(int i=0; i<count; i++) 572 { 573 recv_buf[i] += sum_buf[i]; 574 } 575 } 576 } 577 else if (op == MPI_MAX) 578 { 579 if(ep_rank_loc == 0) 580 { 581 copy(sum_buf, sum_buf+count, recv_buf); 582 } 583 else 584 { 585 for(int i=0; i<count; i++) 586 { 587 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 588 } 589 } 590 } 591 else if(op == MPI_MIN) 592 { 593 if(ep_rank_loc == 0) 594 { 595 copy(sum_buf, sum_buf+count, recv_buf); 596 } 597 else 598 { 599 for(int i=0; i<count; i++) 600 { 601 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 602 } 603 } 604 } 605 else 606 { 607 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 608 exit(1); 609 } 610 } 611 612 delete[] static_cast<float*>(mpi_scan_recvbuf); 613 if(ep_rank_loc == 0) 614 { 615 delete[] static_cast<float*>(local_sum); 616 } 617 } 618 619 else if(datatype == MPI_INT) 620 { 621 int* sum_buf = static_cast<int*>(mpi_scan_recvbuf); 622 int* recv_buf = static_cast<int*>(recvbuf); 623 624 if(mpi_rank != 0) 625 { 626 if(op == MPI_SUM) 627 { 628 if(ep_rank_loc == 0) 629 { 630 copy(sum_buf, sum_buf+count, recv_buf); 631 } 632 else 633 { 634 for(int i=0; i<count; i++) 635 { 636 recv_buf[i] += sum_buf[i]; 637 } 638 } 639 } 640 else if (op == MPI_MAX) 641 { 642 if(ep_rank_loc == 0) 643 { 644 copy(sum_buf, sum_buf+count, recv_buf); 645 } 646 else 647 { 648 for(int i=0; i<count; i++) 649 { 650 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 651 } 652 } 653 } 654 else if(op == MPI_MIN) 655 { 656 if(ep_rank_loc == 0) 657 { 658 copy(sum_buf, sum_buf+count, recv_buf); 659 } 660 else 661 { 662 for(int i=0; i<count; i++) 663 { 664 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 665 } 666 } 667 } 668 else 669 { 670 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 671 exit(1); 672 } 673 } 674 675 delete[] static_cast<int*>(mpi_scan_recvbuf); 676 if(ep_rank_loc == 0) 677 { 678 delete[] static_cast<int*>(local_sum); 679 } 680 } 681 682 else if(datatype == MPI_CHAR) 683 { 684 char* sum_buf = static_cast<char*>(mpi_scan_recvbuf); 685 char* recv_buf = static_cast<char*>(recvbuf); 686 687 if(mpi_rank != 0) 688 { 689 if(op == MPI_SUM) 690 { 691 if(ep_rank_loc == 0) 692 { 693 copy(sum_buf, sum_buf+count, recv_buf); 694 } 695 else 696 { 697 for(int i=0; i<count; i++) 698 { 699 recv_buf[i] += sum_buf[i]; 700 } 701 } 702 } 703 else if (op == MPI_MAX) 704 { 705 if(ep_rank_loc == 0) 706 { 707 copy(sum_buf, sum_buf+count, recv_buf); 708 } 709 else 710 { 711 for(int i=0; i<count; i++) 712 { 713 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 714 } 715 } 716 } 717 else if(op == MPI_MIN) 718 { 719 if(ep_rank_loc == 0) 720 { 721 copy(sum_buf, sum_buf+count, recv_buf); 722 } 723 else 724 { 725 for(int i=0; i<count; i++) 726 { 727 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 728 } 729 } 730 } 731 else 732 { 733 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 734 exit(1); 735 } 736 } 737 738 delete[] static_cast<char*>(mpi_scan_recvbuf); 739 if(ep_rank_loc == 0) 740 { 741 delete[] static_cast<char*>(local_sum); 742 } 743 } 744 745 else if(datatype == MPI_LONG) 746 { 747 long* sum_buf = static_cast<long*>(mpi_scan_recvbuf); 748 long* recv_buf = static_cast<long*>(recvbuf); 749 750 if(mpi_rank != 0) 751 { 752 if(op == MPI_SUM) 753 { 754 if(ep_rank_loc == 0) 755 { 756 copy(sum_buf, sum_buf+count, recv_buf); 757 } 758 else 759 { 760 for(int i=0; i<count; i++) 761 { 762 recv_buf[i] += sum_buf[i]; 763 } 764 } 765 } 766 else if (op == MPI_MAX) 767 { 768 if(ep_rank_loc == 0) 769 { 770 copy(sum_buf, sum_buf+count, recv_buf); 771 } 772 else 773 { 774 for(int i=0; i<count; i++) 775 { 776 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 777 } 778 } 779 } 780 else if(op == MPI_MIN) 781 { 782 if(ep_rank_loc == 0) 783 { 784 copy(sum_buf, sum_buf+count, recv_buf); 785 } 786 else 787 { 788 for(int i=0; i<count; i++) 789 { 790 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 791 } 792 } 793 } 794 else 795 { 796 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 797 exit(1); 798 } 799 } 800 801 delete[] static_cast<long*>(mpi_scan_recvbuf); 802 if(ep_rank_loc == 0) 803 { 804 delete[] static_cast<long*>(local_sum); 805 } 806 } 807 808 else if(datatype == MPI_UNSIGNED_LONG) 809 { 810 unsigned long* sum_buf = static_cast<unsigned long*>(mpi_scan_recvbuf); 811 unsigned long* recv_buf = static_cast<unsigned long*>(recvbuf); 812 813 if(mpi_rank != 0) 814 { 815 if(op == MPI_SUM) 816 { 817 if(ep_rank_loc == 0) 818 { 819 copy(sum_buf, sum_buf+count, recv_buf); 820 } 821 else 822 { 823 for(int i=0; i<count; i++) 824 { 825 recv_buf[i] += sum_buf[i]; 826 } 827 } 828 } 829 else if (op == MPI_MAX) 830 { 831 if(ep_rank_loc == 0) 832 { 833 copy(sum_buf, sum_buf+count, recv_buf); 834 } 835 else 836 { 837 for(int i=0; i<count; i++) 838 { 839 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 840 } 841 } 842 } 843 else if(op == MPI_MIN) 844 { 845 if(ep_rank_loc == 0) 846 { 847 copy(sum_buf, sum_buf+count, recv_buf); 848 } 849 else 850 { 851 for(int i=0; i<count; i++) 852 { 853 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 854 } 855 } 856 } 857 else 858 { 859 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 860 exit(1); 861 } 862 } 863 864 delete[] static_cast<unsigned long*>(mpi_scan_recvbuf); 865 if(ep_rank_loc == 0) 866 { 867 delete[] static_cast<unsigned long*>(local_sum); 868 } 869 } 870 871 872 } 873 874 282 875 283 876 } -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_free.cpp
r1287 r1289 5 5 namespace ep_lib 6 6 { 7 8 7 9 8 10 int MPI_Comm_free(MPI_Comm *comm) … … 39 41 Debug("comm is EP, mpi_comm_ptr != NULL\n"); 40 42 43 if(comm->my_buffer != NULL) 44 { 45 if(comm->my_buffer->buf_int != NULL) delete[] comm->my_buffer->buf_int; Debug("buf_int freed\n"); 46 if(comm->my_buffer->buf_float != NULL) delete[] comm->my_buffer->buf_float; Debug("buf_float freed\n"); 47 if(comm->my_buffer->buf_double != NULL) delete[] comm->my_buffer->buf_double; Debug("buf_double freed\n"); 48 if(comm->my_buffer->buf_long != NULL) delete[] comm->my_buffer->buf_long; Debug("buf_long freed\n"); 49 if(comm->my_buffer->buf_ulong != NULL) delete[] comm->my_buffer->buf_ulong; Debug("buf_ulong freed\n"); 50 if(comm->my_buffer->buf_char != NULL) delete[] comm->my_buffer->buf_char; Debug("buf_char freed\n"); 51 } 41 52 42 53 if(comm->ep_barrier != NULL) … … 97 108 Debug("comm is EP, mpi_comm_ptr != NULL\n"); 98 109 110 if(comm->my_buffer != NULL) 111 { 112 if(comm->my_buffer->buf_int != NULL) delete[] comm->my_buffer->buf_int; Debug("buf_int freed\n"); 113 if(comm->my_buffer->buf_float != NULL) delete[] comm->my_buffer->buf_float; Debug("buf_float freed\n"); 114 if(comm->my_buffer->buf_double != NULL) delete[] comm->my_buffer->buf_double; Debug("buf_double freed\n"); 115 if(comm->my_buffer->buf_long != NULL) delete[] comm->my_buffer->buf_long; Debug("buf_long freed\n"); 116 if(comm->my_buffer->buf_ulong != NULL) delete[] comm->my_buffer->buf_ulong; Debug("buf_ulong freed\n"); 117 if(comm->my_buffer->buf_char != NULL) delete[] comm->my_buffer->buf_char; Debug("buf_char freed\n"); 118 } 99 119 100 120 if(comm->ep_barrier != NULL) -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_gather.cpp
r1287 r1289 15 15 namespace ep_lib 16 16 { 17 18 17 int MPI_Gather_local(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, int local_root, MPI_Comm comm) 19 18 { … … 128 127 } 129 128 129 // int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) 130 // { 131 132 // if(!comm.is_ep && comm.mpi_comm) 133 // { 134 // ::MPI_Allgather(const_cast<void*>(sendbuf), sendcount, static_cast< ::MPI_Datatype>(sendtype), recvbuf, recvcount, static_cast< ::MPI_Datatype>(recvtype), 135 // static_cast< ::MPI_Comm>(comm.mpi_comm)); 136 // return 0; 137 // } 138 139 // if(!comm.mpi_comm) return 0; 140 141 // assert(sendcount == recvcount); 142 143 // assert(valid_type(sendtype) && valid_type(recvtype)); 144 145 // MPI_Datatype datatype = sendtype; 146 // int count = sendcount; 147 148 // ::MPI_Aint datasize, lb; 149 150 // ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 151 152 153 // int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 154 // int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 155 // int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 156 // int ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 157 // int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 158 // int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 159 160 // bool is_master = ep_rank_loc==0; 161 162 // void* local_recvbuf; 163 // void* tmp_recvbuf; 164 165 166 // if(is_master) 167 // { 168 // local_recvbuf = new void*[datasize * num_ep * count]; 169 // tmp_recvbuf = new void*[datasize * count * ep_size]; 170 // } 171 172 // MPI_Gather_local(sendbuf, count, datatype, local_recvbuf, 0, comm); 173 174 175 // int* mpi_recvcounts; 176 // int *mpi_displs; 177 178 // if(is_master) 179 // { 180 181 // mpi_recvcounts = new int[mpi_size]; 182 // mpi_displs = new int[mpi_size]; 183 184 // int local_sendcount = num_ep * count; 185 186 // ::MPI_Allgather(&local_sendcount, 1, to_mpi_type(MPI_INT), mpi_recvcounts, 1, to_mpi_type(MPI_INT), to_mpi_comm(comm.mpi_comm)); 187 188 // mpi_displs[0] = 0; 189 // for(int i=1; i<mpi_size; i++) 190 // { 191 // mpi_displs[i] = mpi_displs[i-1] + mpi_recvcounts[i-1]; 192 // } 193 194 195 // ::MPI_Allgatherv(local_recvbuf, num_ep * count, to_mpi_type(datatype), tmp_recvbuf, mpi_recvcounts, mpi_displs, to_mpi_type(datatype), to_mpi_comm(comm.mpi_comm)); 196 197 198 // // reorder 199 // int offset; 200 // for(int i=0; i<ep_size; i++) 201 // { 202 // offset = mpi_displs[comm.rank_map->at(i).second] + comm.rank_map->at(i).first * sendcount; 203 // memcpy(recvbuf + i*sendcount*datasize, tmp_recvbuf+offset*datasize, sendcount*datasize); 204 // } 205 206 // delete[] mpi_recvcounts; 207 // delete[] mpi_displs; 208 // } 209 210 // MPI_Bcast_local(recvbuf, count*ep_size, datatype, 0, comm); 211 212 // MPI_Barrier(comm); 213 214 215 // if(is_master) 216 // { 217 // delete[] local_recvbuf; 218 // delete[] tmp_recvbuf; 219 220 // } 221 222 // } 223 224 int MPI_Gather_local2(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, MPI_Comm comm) 225 { 226 if(datatype == MPI_INT) 227 { 228 Debug("datatype is INT\n"); 229 return MPI_Gather_local_int(sendbuf, count, recvbuf, comm); 230 } 231 else if(datatype == MPI_FLOAT) 232 { 233 Debug("datatype is FLOAT\n"); 234 return MPI_Gather_local_float(sendbuf, count, recvbuf, comm); 235 } 236 else if(datatype == MPI_DOUBLE) 237 { 238 Debug("datatype is DOUBLE\n"); 239 return MPI_Gather_local_double(sendbuf, count, recvbuf, comm); 240 } 241 else if(datatype == MPI_LONG) 242 { 243 Debug("datatype is LONG\n"); 244 return MPI_Gather_local_long(sendbuf, count, recvbuf, comm); 245 } 246 else if(datatype == MPI_UNSIGNED_LONG) 247 { 248 Debug("datatype is uLONG\n"); 249 return MPI_Gather_local_ulong(sendbuf, count, recvbuf, comm); 250 } 251 else if(datatype == MPI_CHAR) 252 { 253 Debug("datatype is CHAR\n"); 254 return MPI_Gather_local_char(sendbuf, count, recvbuf, comm); 255 } 256 else 257 { 258 printf("MPI_Gather Datatype not supported!\n"); 259 exit(0); 260 } 261 } 262 263 int MPI_Gather_local_int(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 264 { 265 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 266 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 267 268 int *buffer = comm.my_buffer->buf_int; 269 int *send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 270 int *recv_buf = static_cast<int*>(recvbuf); 271 272 if(my_rank == 0) 273 { 274 copy(send_buf, send_buf+count, recv_buf); 275 } 276 277 for(int j=0; j<count; j+=BUFFER_SIZE) 278 { 279 for(int k=1; k<num_ep; k++) 280 { 281 if(my_rank == k) 282 { 283 #pragma omp critical (write_to_buffer) 284 { 285 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 286 #pragma omp flush 287 } 288 } 289 290 MPI_Barrier_local(comm); 291 292 if(my_rank == 0) 293 { 294 #pragma omp flush 295 #pragma omp critical (read_from_buffer) 296 { 297 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j+k*count); 298 } 299 } 300 301 MPI_Barrier_local(comm); 302 } 303 } 304 } 305 306 int MPI_Gather_local_float(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 307 { 308 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 309 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 310 311 float *buffer = comm.my_buffer->buf_float; 312 float *send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 313 float *recv_buf = static_cast<float*>(recvbuf); 314 315 if(my_rank == 0) 316 { 317 copy(send_buf, send_buf+count, recv_buf); 318 } 319 320 for(int j=0; j<count; j+=BUFFER_SIZE) 321 { 322 for(int k=1; k<num_ep; k++) 323 { 324 if(my_rank == k) 325 { 326 #pragma omp critical (write_to_buffer) 327 { 328 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 329 #pragma omp flush 330 } 331 } 332 333 MPI_Barrier_local(comm); 334 335 if(my_rank == 0) 336 { 337 #pragma omp flush 338 #pragma omp critical (read_from_buffer) 339 { 340 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j+k*count); 341 } 342 } 343 344 MPI_Barrier_local(comm); 345 } 346 } 347 } 348 349 int MPI_Gather_local_double(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 350 { 351 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 352 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 353 354 double *buffer = comm.my_buffer->buf_double; 355 double *send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 356 double *recv_buf = static_cast<double*>(recvbuf); 357 358 if(my_rank == 0) 359 { 360 copy(send_buf, send_buf+count, recv_buf); 361 } 362 363 for(int j=0; j<count; j+=BUFFER_SIZE) 364 { 365 for(int k=1; k<num_ep; k++) 366 { 367 if(my_rank == k) 368 { 369 #pragma omp critical (write_to_buffer) 370 { 371 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 372 #pragma omp flush 373 } 374 } 375 376 MPI_Barrier_local(comm); 377 378 if(my_rank == 0) 379 { 380 #pragma omp flush 381 #pragma omp critical (read_from_buffer) 382 { 383 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j+k*count); 384 } 385 } 386 387 MPI_Barrier_local(comm); 388 } 389 } 390 } 391 392 int MPI_Gather_local_long(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 393 { 394 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 395 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 396 397 long *buffer = comm.my_buffer->buf_long; 398 long *send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 399 long *recv_buf = static_cast<long*>(recvbuf); 400 401 if(my_rank == 0) 402 { 403 copy(send_buf, send_buf+count, recv_buf); 404 } 405 406 for(int j=0; j<count; j+=BUFFER_SIZE) 407 { 408 for(int k=1; k<num_ep; k++) 409 { 410 if(my_rank == k) 411 { 412 #pragma omp critical (write_to_buffer) 413 { 414 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 415 #pragma omp flush 416 } 417 } 418 419 MPI_Barrier_local(comm); 420 421 if(my_rank == 0) 422 { 423 #pragma omp flush 424 #pragma omp critical (read_from_buffer) 425 { 426 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j+k*count); 427 } 428 } 429 430 MPI_Barrier_local(comm); 431 } 432 } 433 } 434 435 int MPI_Gather_local_ulong(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 436 { 437 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 438 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 439 440 unsigned long *buffer = comm.my_buffer->buf_ulong; 441 unsigned long *send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 442 unsigned long *recv_buf = static_cast<unsigned long*>(recvbuf); 443 444 if(my_rank == 0) 445 { 446 copy(send_buf, send_buf+count, recv_buf); 447 } 448 449 for(int j=0; j<count; j+=BUFFER_SIZE) 450 { 451 for(int k=1; k<num_ep; k++) 452 { 453 if(my_rank == k) 454 { 455 #pragma omp critical (write_to_buffer) 456 { 457 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 458 #pragma omp flush 459 } 460 } 461 462 MPI_Barrier_local(comm); 463 464 if(my_rank == 0) 465 { 466 #pragma omp flush 467 #pragma omp critical (read_from_buffer) 468 { 469 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j+k*count); 470 } 471 } 472 473 MPI_Barrier_local(comm); 474 } 475 } 476 } 477 478 479 int MPI_Gather_local_char(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 480 { 481 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 482 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 483 484 char *buffer = comm.my_buffer->buf_char; 485 char *send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 486 char *recv_buf = static_cast<char*>(recvbuf); 487 488 if(my_rank == 0) 489 { 490 copy(send_buf, send_buf+count, recv_buf); 491 } 492 493 for(int j=0; j<count; j+=BUFFER_SIZE) 494 { 495 for(int k=1; k<num_ep; k++) 496 { 497 if(my_rank == k) 498 { 499 #pragma omp critical (write_to_buffer) 500 { 501 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 502 #pragma omp flush 503 } 504 } 505 506 MPI_Barrier_local(comm); 507 508 if(my_rank == 0) 509 { 510 #pragma omp flush 511 #pragma omp critical (read_from_buffer) 512 { 513 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j+k*count); 514 } 515 } 516 517 MPI_Barrier_local(comm); 518 } 519 } 520 } 521 522 523 524 int MPI_Gather2(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) 525 { 526 if(!comm.is_ep && comm.mpi_comm) 527 { 528 ::MPI_Gather(const_cast<void*>(sendbuf), sendcount, static_cast< ::MPI_Datatype>(sendtype), recvbuf, recvcount, static_cast< ::MPI_Datatype>(recvtype), 529 root, static_cast< ::MPI_Comm>(comm.mpi_comm)); 530 return 0; 531 } 532 533 if(!comm.mpi_comm) return 0; 534 535 MPI_Bcast(&recvcount, 1, MPI_INT, root, comm); 536 537 assert(static_cast< ::MPI_Datatype>(sendtype) == static_cast< ::MPI_Datatype>(recvtype) && sendcount == recvcount); 538 539 MPI_Datatype datatype = sendtype; 540 int count = sendcount; 541 542 int ep_rank, ep_rank_loc, mpi_rank; 543 int ep_size, num_ep, mpi_size; 544 545 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 546 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 547 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 548 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 549 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 550 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 551 552 553 int root_mpi_rank = comm.rank_map->at(root).second; 554 int root_ep_loc = comm.rank_map->at(root).first; 555 556 557 ::MPI_Aint datasize, lb; 558 559 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 560 561 void *local_gather_recvbuf; 562 void *master_recvbuf; 563 if(ep_rank_loc == 0 && mpi_rank == root_mpi_rank && root_ep_loc != 0) 564 { 565 master_recvbuf = new void*[datasize*ep_size*count]; 566 } 567 568 if(ep_rank_loc==0) 569 { 570 local_gather_recvbuf = new void*[datasize*num_ep*count]; 571 } 572 573 // local gather to master 574 MPI_Gather_local2(sendbuf, count, datatype, local_gather_recvbuf, comm); 575 576 //MPI_Gather 577 578 if(ep_rank_loc == 0) 579 { 580 int *gatherv_recvcnt; 581 int *gatherv_displs; 582 int gatherv_cnt = count*num_ep; 583 584 gatherv_recvcnt = new int[mpi_size]; 585 gatherv_displs = new int[mpi_size]; 586 587 588 ::MPI_Allgather(&gatherv_cnt, 1, MPI_INT, gatherv_recvcnt, 1, MPI_INT, static_cast< ::MPI_Comm>(comm.mpi_comm)); 589 590 gatherv_displs[0] = 0; 591 for(int i=1; i<mpi_size; i++) 592 { 593 gatherv_displs[i] = gatherv_recvcnt[i-1] + gatherv_displs[i-1]; 594 } 595 596 if(root_ep_loc != 0) // gather to root_master 597 { 598 ::MPI_Gatherv(local_gather_recvbuf, count*num_ep, static_cast< ::MPI_Datatype>(datatype), master_recvbuf, gatherv_recvcnt, 599 gatherv_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 600 } 601 else 602 { 603 ::MPI_Gatherv(local_gather_recvbuf, count*num_ep, static_cast< ::MPI_Datatype>(datatype), recvbuf, gatherv_recvcnt, 604 gatherv_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 605 } 606 607 delete[] gatherv_recvcnt; 608 delete[] gatherv_displs; 609 } 610 611 612 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) // root is not master, master send to root and root receive from master 613 { 614 innode_memcpy(0, master_recvbuf, root_ep_loc, recvbuf, count*ep_size, datatype, comm); 615 } 616 617 618 619 if(ep_rank_loc==0) 620 { 621 if(datatype == MPI_INT) 622 { 623 delete[] static_cast<int*>(local_gather_recvbuf); 624 } 625 else if(datatype == MPI_FLOAT) 626 { 627 delete[] static_cast<float*>(local_gather_recvbuf); 628 } 629 else if(datatype == MPI_DOUBLE) 630 { 631 delete[] static_cast<double*>(local_gather_recvbuf); 632 } 633 else if(datatype == MPI_CHAR) 634 { 635 delete[] static_cast<char*>(local_gather_recvbuf); 636 } 637 else if(datatype == MPI_LONG) 638 { 639 delete[] static_cast<long*>(local_gather_recvbuf); 640 } 641 else// if(datatype == MPI_UNSIGNED_LONG) 642 { 643 delete[] static_cast<unsigned long*>(local_gather_recvbuf); 644 } 645 646 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) delete[] master_recvbuf; 647 } 648 } 649 650 651 int MPI_Allgather2(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) 652 { 653 if(!comm.is_ep && comm.mpi_comm) 654 { 655 ::MPI_Allgather(const_cast<void*>(sendbuf), sendcount, static_cast< ::MPI_Datatype>(sendtype), recvbuf, recvcount, static_cast< ::MPI_Datatype>(recvtype), 656 static_cast< ::MPI_Comm>(comm.mpi_comm)); 657 return 0; 658 } 659 660 if(!comm.mpi_comm) return 0; 661 662 assert(static_cast< ::MPI_Datatype>(sendtype) == static_cast< ::MPI_Datatype>(recvtype) && sendcount == recvcount); 663 664 MPI_Datatype datatype = sendtype; 665 int count = sendcount; 666 667 int ep_rank, ep_rank_loc, mpi_rank; 668 int ep_size, num_ep, mpi_size; 669 670 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 671 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 672 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 673 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 674 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 675 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 676 677 678 ::MPI_Aint datasize, lb; 679 680 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 681 682 void *local_gather_recvbuf; 683 684 if(ep_rank_loc==0) 685 { 686 local_gather_recvbuf = new void*[datasize*num_ep*count]; 687 } 688 689 // local gather to master 690 MPI_Gather_local2(sendbuf, count, datatype, local_gather_recvbuf, comm); 691 692 //MPI_Gather 693 694 if(ep_rank_loc == 0) 695 { 696 int *gatherv_recvcnt; 697 int *gatherv_displs; 698 int gatherv_cnt = count*num_ep; 699 700 gatherv_recvcnt = new int[mpi_size]; 701 gatherv_displs = new int[mpi_size]; 702 703 ::MPI_Allgather(&gatherv_cnt, 1, MPI_INT, gatherv_recvcnt, 1, MPI_INT, static_cast< ::MPI_Comm>(comm.mpi_comm)); 704 705 gatherv_displs[0] = 0; 706 for(int i=1; i<mpi_size; i++) 707 { 708 gatherv_displs[i] = gatherv_recvcnt[i-1] + gatherv_displs[i-1]; 709 } 710 711 ::MPI_Allgatherv(local_gather_recvbuf, count*num_ep, static_cast< ::MPI_Datatype>(datatype), recvbuf, gatherv_recvcnt, 712 gatherv_displs, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Comm>(comm.mpi_comm)); 713 714 delete[] gatherv_recvcnt; 715 delete[] gatherv_displs; 716 } 717 718 MPI_Bcast_local2(recvbuf, count*ep_size, datatype, comm); 719 720 721 if(ep_rank_loc==0) 722 { 723 if(datatype == MPI_INT) 724 { 725 delete[] static_cast<int*>(local_gather_recvbuf); 726 } 727 else if(datatype == MPI_FLOAT) 728 { 729 delete[] static_cast<float*>(local_gather_recvbuf); 730 } 731 else if(datatype == MPI_DOUBLE) 732 { 733 delete[] static_cast<double*>(local_gather_recvbuf); 734 } 735 else if(datatype == MPI_CHAR) 736 { 737 delete[] static_cast<char*>(local_gather_recvbuf); 738 } 739 else if(datatype == MPI_LONG) 740 { 741 delete[] static_cast<long*>(local_gather_recvbuf); 742 } 743 else// if(datatype == MPI_UNSIGNED_LONG) 744 { 745 delete[] static_cast<unsigned long*>(local_gather_recvbuf); 746 } 747 } 748 } 749 750 130 751 } -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_gatherv.cpp
r1287 r1289 15 15 namespace ep_lib 16 16 { 17 18 int MPI_Gatherv_local(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, const int recvcounts[], const int displs[], int local_root, MPI_Comm comm) 17 int MPI_Gatherv_local(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, const int recvcounts[], const int displs[], int local_root, MPI_Comm comm) 19 18 { 20 19 assert(valid_type(datatype)); … … 186 185 } 187 186 187 // int MPI_Allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], MPI_Datatype recvtype, MPI_Comm comm) 188 // { 189 190 // if(!comm.is_ep && comm.mpi_comm) 191 // { 192 // ::MPI_Allgatherv(sendbuf, sendcount, to_mpi_type(sendtype), recvbuf, recvcounts, displs, to_mpi_type(recvtype), to_mpi_comm(comm.mpi_comm)); 193 // return 0; 194 // } 195 196 // if(!comm.mpi_comm) return 0; 197 198 199 200 201 // assert(valid_type(sendtype) && valid_type(recvtype)); 202 203 // MPI_Datatype datatype = sendtype; 204 // int count = sendcount; 205 206 // ::MPI_Aint datasize, lb; 207 208 // ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 209 210 211 // int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 212 // int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 213 // int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 214 // int ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 215 // int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 216 // int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 217 218 219 // assert(sendcount == recvcounts[ep_rank]); 220 221 // bool is_master = ep_rank_loc==0; 222 223 // void* local_recvbuf; 224 // void* tmp_recvbuf; 225 226 // int recvbuf_size = 0; 227 // for(int i=0; i<ep_size; i++) 228 // recvbuf_size = max(recvbuf_size, displs[i]+recvcounts[i]); 229 230 231 // vector<int>local_recvcounts(num_ep, 0); 232 // vector<int>local_displs(num_ep, 0); 233 234 // MPI_Gather_local(&sendcount, 1, MPI_INT, local_recvcounts.data(), 0, comm); 235 // for(int i=1; i<num_ep; i++) local_displs[i] = local_displs[i-1] + local_recvcounts[i-1]; 236 237 238 // if(is_master) 239 // { 240 // local_recvbuf = new void*[datasize * std::accumulate(local_recvcounts.begin(), local_recvcounts.begin()+num_ep, 0)]; 241 // tmp_recvbuf = new void*[datasize * std::accumulate(recvcounts, recvcounts+ep_size, 0)]; 242 // } 243 244 // MPI_Gatherv_local(sendbuf, count, datatype, local_recvbuf, local_recvcounts.data(), local_displs.data(), 0, comm); 245 246 247 // if(is_master) 248 // { 249 // std::vector<int>mpi_recvcounts(mpi_size, 0); 250 // std::vector<int>mpi_displs(mpi_size, 0); 251 252 // int local_sendcount = std::accumulate(local_recvcounts.begin(), local_recvcounts.begin()+num_ep, 0); 253 // MPI_Allgather(&local_sendcount, 1, MPI_INT, mpi_recvcounts.data(), 1, MPI_INT, to_mpi_comm(comm.mpi_comm)); 254 255 // for(int i=1; i<mpi_size; i++) 256 // mpi_displs[i] = mpi_displs[i-1] + mpi_recvcounts[i-1]; 257 258 259 // ::MPI_Allgatherv(local_recvbuf, local_sendcount, to_mpi_type(datatype), tmp_recvbuf, mpi_recvcounts.data(), mpi_displs.data(), to_mpi_type(datatype), to_mpi_comm(comm.mpi_comm)); 260 261 262 263 // // reorder 264 // int offset; 265 // for(int i=0; i<ep_size; i++) 266 // { 267 // int extra = 0; 268 // for(int j=0, k=0; j<ep_size, k<comm.rank_map->at(i).first; j++) 269 // if(comm.rank_map->at(i).second == comm.rank_map->at(j).second) 270 // { 271 // extra += recvcounts[j]; 272 // k++; 273 // } 274 275 // offset = mpi_displs[comm.rank_map->at(i).second] + extra; 276 277 // memcpy(recvbuf+displs[i]*datasize, tmp_recvbuf+offset*datasize, recvcounts[i]*datasize); 278 279 // } 280 281 // } 282 283 // MPI_Bcast_local(recvbuf, recvbuf_size, datatype, 0, comm); 284 285 // if(is_master) 286 // { 287 // delete[] local_recvbuf; 288 // delete[] tmp_recvbuf; 289 // } 290 291 // } 292 293 294 int MPI_Gatherv_local2(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 295 { 296 if(datatype == MPI_INT) 297 { 298 Debug("datatype is INT\n"); 299 return MPI_Gatherv_local_int(sendbuf, count, recvbuf, recvcounts, displs, comm); 300 } 301 else if(datatype == MPI_FLOAT) 302 { 303 Debug("datatype is FLOAT\n"); 304 return MPI_Gatherv_local_float(sendbuf, count, recvbuf, recvcounts, displs, comm); 305 } 306 else if(datatype == MPI_DOUBLE) 307 { 308 Debug("datatype is DOUBLE\n"); 309 return MPI_Gatherv_local_double(sendbuf, count, recvbuf, recvcounts, displs, comm); 310 } 311 else if(datatype == MPI_LONG) 312 { 313 Debug("datatype is LONG\n"); 314 return MPI_Gatherv_local_long(sendbuf, count, recvbuf, recvcounts, displs, comm); 315 } 316 else if(datatype == MPI_UNSIGNED_LONG) 317 { 318 Debug("datatype is uLONG\n"); 319 return MPI_Gatherv_local_ulong(sendbuf, count, recvbuf, recvcounts, displs, comm); 320 } 321 else if(datatype == MPI_CHAR) 322 { 323 Debug("datatype is CHAR\n"); 324 return MPI_Gatherv_local_char(sendbuf, count, recvbuf, recvcounts, displs, comm); 325 } 326 else 327 { 328 printf("MPI_Gatherv Datatype not supported!\n"); 329 exit(0); 330 } 331 } 332 333 int MPI_Gatherv_local_int(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 334 { 335 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 336 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 337 338 int *buffer = comm.my_buffer->buf_int; 339 int *send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 340 int *recv_buf = static_cast<int*>(recvbuf); 341 342 if(my_rank == 0) 343 { 344 assert(count == recvcounts[0]); 345 copy(send_buf, send_buf+count, recv_buf + displs[0]); 346 } 347 348 for(int j=0; count!=0? j<count: j<count+1; j+=BUFFER_SIZE) 349 { 350 for(int k=1; k<num_ep; k++) 351 { 352 if(my_rank == k) 353 { 354 #pragma omp critical (write_to_buffer) 355 { 356 if(count!=0) copy(send_buf+j, send_buf + min(BUFFER_SIZE, count-j) , buffer); 357 #pragma omp flush 358 } 359 } 360 361 MPI_Barrier_local(comm); 362 363 if(my_rank == 0) 364 { 365 #pragma omp flush 366 #pragma omp critical (read_from_buffer) 367 { 368 copy(buffer, buffer+min(BUFFER_SIZE, recvcounts[k]-j), recv_buf+j+displs[k]); 369 } 370 } 371 372 MPI_Barrier_local(comm); 373 } 374 } 375 } 376 377 int MPI_Gatherv_local_float(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 378 { 379 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 380 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 381 382 float *buffer = comm.my_buffer->buf_float; 383 float *send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 384 float *recv_buf = static_cast<float*>(recvbuf); 385 386 if(my_rank == 0) 387 { 388 assert(count == recvcounts[0]); 389 copy(send_buf, send_buf+count, recv_buf + displs[0]); 390 } 391 392 for(int j=0; count!=0? j<count: j<count+1; j+=BUFFER_SIZE) 393 { 394 for(int k=1; k<num_ep; k++) 395 { 396 if(my_rank == k) 397 { 398 #pragma omp critical (write_to_buffer) 399 { 400 if(count!=0) copy(send_buf+j, send_buf + min(BUFFER_SIZE, count-j) , buffer); 401 #pragma omp flush 402 } 403 } 404 405 MPI_Barrier_local(comm); 406 407 if(my_rank == 0) 408 { 409 #pragma omp flush 410 #pragma omp critical (read_from_buffer) 411 { 412 copy(buffer, buffer+min(BUFFER_SIZE, recvcounts[k]-j), recv_buf+j+displs[k]); 413 } 414 } 415 416 MPI_Barrier_local(comm); 417 } 418 } 419 } 420 421 int MPI_Gatherv_local_double(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 422 { 423 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 424 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 425 426 double *buffer = comm.my_buffer->buf_double; 427 double *send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 428 double *recv_buf = static_cast<double*>(recvbuf); 429 430 if(my_rank == 0) 431 { 432 assert(count == recvcounts[0]); 433 copy(send_buf, send_buf+count, recv_buf + displs[0]); 434 } 435 436 for(int j=0; count!=0? j<count: j<count+1; j+=BUFFER_SIZE) 437 { 438 for(int k=1; k<num_ep; k++) 439 { 440 if(my_rank == k) 441 { 442 #pragma omp critical (write_to_buffer) 443 { 444 if(count!=0) copy(send_buf+j, send_buf + min(BUFFER_SIZE, count-j) , buffer); 445 #pragma omp flush 446 } 447 } 448 449 MPI_Barrier_local(comm); 450 451 if(my_rank == 0) 452 { 453 #pragma omp flush 454 #pragma omp critical (read_from_buffer) 455 { 456 copy(buffer, buffer+min(BUFFER_SIZE, recvcounts[k]-j), recv_buf+j+displs[k]); 457 } 458 } 459 460 MPI_Barrier_local(comm); 461 } 462 } 463 } 464 465 int MPI_Gatherv_local_long(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 466 { 467 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 468 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 469 470 long *buffer = comm.my_buffer->buf_long; 471 long *send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 472 long *recv_buf = static_cast<long*>(recvbuf); 473 474 if(my_rank == 0) 475 { 476 assert(count == recvcounts[0]); 477 copy(send_buf, send_buf+count, recv_buf + displs[0]); 478 } 479 480 for(int j=0; count!=0? j<count: j<count+1; j+=BUFFER_SIZE) 481 { 482 for(int k=1; k<num_ep; k++) 483 { 484 if(my_rank == k) 485 { 486 #pragma omp critical (write_to_buffer) 487 { 488 if(count!=0)copy(send_buf+j, send_buf + min(BUFFER_SIZE, count-j) , buffer); 489 #pragma omp flush 490 } 491 } 492 493 MPI_Barrier_local(comm); 494 495 if(my_rank == 0) 496 { 497 #pragma omp flush 498 #pragma omp critical (read_from_buffer) 499 { 500 copy(buffer, buffer+min(BUFFER_SIZE, recvcounts[k]-j), recv_buf+j+displs[k]); 501 } 502 } 503 504 MPI_Barrier_local(comm); 505 } 506 } 507 } 508 509 int MPI_Gatherv_local_ulong(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 510 { 511 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 512 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 513 514 unsigned long *buffer = comm.my_buffer->buf_ulong; 515 unsigned long *send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 516 unsigned long *recv_buf = static_cast<unsigned long*>(recvbuf); 517 518 if(my_rank == 0) 519 { 520 assert(count == recvcounts[0]); 521 copy(send_buf, send_buf+count, recv_buf + displs[0]); 522 } 523 524 for(int j=0; count!=0? j<count: j<count+1; j+=BUFFER_SIZE) 525 { 526 for(int k=1; k<num_ep; k++) 527 { 528 if(my_rank == k) 529 { 530 #pragma omp critical (write_to_buffer) 531 { 532 if(count!=0) copy(send_buf+j, send_buf + min(BUFFER_SIZE, count-j) , buffer); 533 #pragma omp flush 534 } 535 } 536 537 MPI_Barrier_local(comm); 538 539 if(my_rank == 0) 540 { 541 #pragma omp flush 542 #pragma omp critical (read_from_buffer) 543 { 544 copy(buffer, buffer+min(BUFFER_SIZE, recvcounts[k]-j), recv_buf+j+displs[k]); 545 } 546 } 547 548 MPI_Barrier_local(comm); 549 } 550 } 551 } 552 553 int MPI_Gatherv_local_char(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm) 554 { 555 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 556 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 557 558 char *buffer = comm.my_buffer->buf_char; 559 char *send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 560 char *recv_buf = static_cast<char*>(recvbuf); 561 562 if(my_rank == 0) 563 { 564 assert(count == recvcounts[0]); 565 copy(send_buf, send_buf+count, recv_buf + displs[0]); 566 } 567 568 for(int j=0; count!=0? j<count: j<count+1; j+=BUFFER_SIZE) 569 { 570 for(int k=1; k<num_ep; k++) 571 { 572 if(my_rank == k) 573 { 574 #pragma omp critical (write_to_buffer) 575 { 576 if(count!=0) copy(send_buf+j, send_buf + min(BUFFER_SIZE, count-j) , buffer); 577 #pragma omp flush 578 } 579 } 580 581 MPI_Barrier_local(comm); 582 583 if(my_rank == 0) 584 { 585 #pragma omp flush 586 #pragma omp critical (read_from_buffer) 587 { 588 copy(buffer, buffer+min(BUFFER_SIZE, recvcounts[k]-j), recv_buf+j+displs[k]); 589 } 590 } 591 592 MPI_Barrier_local(comm); 593 } 594 } 595 } 596 597 598 int MPI_Gatherv2(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 599 MPI_Datatype recvtype, int root, MPI_Comm comm) 600 { 601 602 if(!comm.is_ep && comm.mpi_comm) 603 { 604 ::MPI_Gatherv(const_cast<void*>(sendbuf), sendcount, static_cast< ::MPI_Datatype>(sendtype), recvbuf, const_cast<int*>(recvcounts), const_cast<int*>(displs), 605 static_cast< ::MPI_Datatype>(recvtype), root, static_cast< ::MPI_Comm>(comm.mpi_comm)); 606 return 0; 607 } 608 609 if(!comm.mpi_comm) return 0; 610 611 assert(static_cast< ::MPI_Datatype>(sendtype) == static_cast< ::MPI_Datatype>(recvtype)); 612 613 MPI_Datatype datatype = sendtype; 614 int count = sendcount; 615 616 int ep_rank, ep_rank_loc, mpi_rank; 617 int ep_size, num_ep, mpi_size; 618 619 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 620 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 621 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 622 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 623 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 624 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 625 626 627 628 if(ep_size == mpi_size) 629 return ::MPI_Gatherv(sendbuf, sendcount, static_cast< ::MPI_Datatype>(datatype), recvbuf, recvcounts, displs, 630 static_cast< ::MPI_Datatype>(datatype), root, static_cast< ::MPI_Comm>(comm.mpi_comm)); 631 632 if(ep_rank != root) 633 { 634 recvcounts = new int[ep_size]; 635 displs = new int[ep_size]; 636 } 637 638 MPI_Bcast(const_cast< int* >(displs), ep_size, MPI_INT, root, comm); 639 MPI_Bcast(const_cast< int* >(recvcounts), ep_size, MPI_INT, root, comm); 640 641 642 int recv_plus_displs[ep_size]; 643 for(int i=0; i<ep_size; i++) recv_plus_displs[i] = recvcounts[i] + displs[i]; 644 645 for(int j=0; j<mpi_size; j++) 646 { 647 if(recv_plus_displs[j*num_ep] < displs[j*num_ep+1] || 648 recv_plus_displs[j*num_ep + num_ep -1] < displs[j*num_ep + num_ep -2]) 649 { 650 Debug("Call special implementation of mpi_gatherv. 1st condition not OK\n"); 651 return MPI_Allgatherv_special(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm); 652 } 653 654 for(int i=1; i<num_ep-1; i++) 655 { 656 if(recv_plus_displs[j*num_ep+i] < displs[j*num_ep+i+1] || 657 recv_plus_displs[j*num_ep+i] < displs[j*num_ep+i-1]) 658 { 659 Debug("Call special implementation of mpi_gatherv. 2nd condition not OK\n"); 660 return MPI_Allgatherv_special(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm); 661 } 662 } 663 } 664 665 666 int root_mpi_rank = comm.rank_map->at(root).second; 667 int root_ep_loc = comm.rank_map->at(root).first; 668 669 670 ::MPI_Aint datasize, lb; 671 672 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 673 674 void *local_gather_recvbuf; 675 int buffer_size; 676 void *master_recvbuf; 677 678 if(ep_rank_loc == 0 && mpi_rank == root_mpi_rank && root_ep_loc != 0) 679 { 680 master_recvbuf = new void*[sizeof(recvbuf)]; 681 assert(root_ep_loc == 0); 682 } 683 684 if(ep_rank_loc==0) 685 { 686 buffer_size = *std::max_element(recv_plus_displs+ep_rank, recv_plus_displs+ep_rank+num_ep); 687 688 local_gather_recvbuf = new void*[datasize*buffer_size]; 689 } 690 691 MPI_Gatherv_local2(sendbuf, count, datatype, local_gather_recvbuf, recvcounts+ep_rank-ep_rank_loc, displs+ep_rank-ep_rank_loc, comm); 692 693 //MPI_Gather 694 if(ep_rank_loc == 0) 695 { 696 int *mpi_recvcnt= new int[mpi_size]; 697 int *mpi_displs= new int[mpi_size]; 698 699 int buff_start = *std::min_element(displs+ep_rank, displs+ep_rank+num_ep);; 700 int buff_end = buffer_size; 701 702 int mpi_sendcnt = buff_end - buff_start; 703 704 705 ::MPI_Gather(&mpi_sendcnt, 1, MPI_INT, mpi_recvcnt, 1, MPI_INT, root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 706 ::MPI_Gather(&buff_start, 1, MPI_INT, mpi_displs, 1, MPI_INT, root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 707 708 if(root_ep_loc == 0) 709 { ::MPI_Gatherv(local_gather_recvbuf + datasize*buff_start, mpi_sendcnt, static_cast< ::MPI_Datatype>(datatype), recvbuf, mpi_recvcnt, 710 mpi_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 711 } 712 else // gatherv to master_recvbuf 713 { ::MPI_Gatherv(local_gather_recvbuf + datasize*buff_start, mpi_sendcnt, static_cast< ::MPI_Datatype>(datatype), master_recvbuf, mpi_recvcnt, 714 mpi_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 715 } 716 717 delete[] mpi_recvcnt; 718 delete[] mpi_displs; 719 } 720 721 int global_min_displs = *std::min_element(displs, displs+ep_size); 722 int global_recvcnt = *std::max_element(recv_plus_displs, recv_plus_displs+ep_size); 723 724 725 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) // root is not master, master send to root and root receive from master 726 { 727 innode_memcpy(0, master_recvbuf+datasize*global_min_displs, root_ep_loc, recvbuf+datasize*global_min_displs, global_recvcnt, datatype, comm); 728 if(ep_rank_loc == 0) delete[] master_recvbuf; 729 } 730 731 732 733 if(ep_rank_loc==0) 734 { 735 if(datatype == MPI_INT) 736 { 737 delete[] static_cast<int*>(local_gather_recvbuf); 738 } 739 else if(datatype == MPI_FLOAT) 740 { 741 delete[] static_cast<float*>(local_gather_recvbuf); 742 } 743 else if(datatype == MPI_DOUBLE) 744 { 745 delete[] static_cast<double*>(local_gather_recvbuf); 746 } 747 else if(datatype == MPI_LONG) 748 { 749 delete[] static_cast<long*>(local_gather_recvbuf); 750 } 751 else if(datatype == MPI_UNSIGNED_LONG) 752 { 753 delete[] static_cast<unsigned long*>(local_gather_recvbuf); 754 } 755 else // if(datatype == MPI_CHAR) 756 { 757 delete[] static_cast<char*>(local_gather_recvbuf); 758 } 759 } 760 else 761 { 762 delete[] recvcounts; 763 delete[] displs; 764 } 765 return 0; 766 } 767 768 769 770 int MPI_Allgatherv2(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 771 MPI_Datatype recvtype, MPI_Comm comm) 772 { 773 774 if(!comm.is_ep && comm.mpi_comm) 775 { 776 ::MPI_Allgatherv(sendbuf, sendcount, static_cast< ::MPI_Datatype>(sendtype), recvbuf, recvcounts, displs, 777 static_cast< ::MPI_Datatype>(recvtype), static_cast< ::MPI_Comm>(comm.mpi_comm)); 778 return 0; 779 } 780 781 if(!comm.mpi_comm) return 0; 782 783 assert(static_cast< ::MPI_Datatype>(sendtype) == static_cast< ::MPI_Datatype>(recvtype)); 784 785 786 MPI_Datatype datatype = sendtype; 787 int count = sendcount; 788 789 int ep_rank, ep_rank_loc, mpi_rank; 790 int ep_size, num_ep, mpi_size; 791 792 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 793 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 794 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 795 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 796 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 797 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 798 799 if(ep_size == mpi_size) // needed by servers 800 return ::MPI_Allgatherv(sendbuf, sendcount, static_cast< ::MPI_Datatype>(datatype), recvbuf, recvcounts, displs, 801 static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Comm>(comm.mpi_comm)); 802 803 int recv_plus_displs[ep_size]; 804 for(int i=0; i<ep_size; i++) recv_plus_displs[i] = recvcounts[i] + displs[i]; 805 806 807 for(int j=0; j<mpi_size; j++) 808 { 809 if(recv_plus_displs[j*num_ep] < displs[j*num_ep+1] || 810 recv_plus_displs[j*num_ep + num_ep -1] < displs[j*num_ep + num_ep -2]) 811 { 812 printf("proc %d/%d Call special implementation of mpi_allgatherv.\n", ep_rank, ep_size); 813 for(int k=0; k<ep_size; k++) 814 printf("recv_plus_displs[%d] = %d\t displs[%d] = %d\n", k, recv_plus_displs[k], k, displs[k]); 815 816 return MPI_Allgatherv_special(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm); 817 } 818 819 for(int i=1; i<num_ep-1; i++) 820 { 821 if(recv_plus_displs[j*num_ep+i] < displs[j*num_ep+i+1] || 822 recv_plus_displs[j*num_ep+i] < displs[j*num_ep+i-1]) 823 { 824 printf("proc %d/%d Call special implementation of mpi_allgatherv.\n", ep_rank, ep_size); 825 return MPI_Allgatherv_special(sendbuf, sendcount, sendtype, recvbuf, recvcounts, displs, recvtype, comm); 826 } 827 } 828 } 829 830 ::MPI_Aint datasize, lb; 831 832 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 833 834 void *local_gather_recvbuf; 835 int buffer_size; 836 837 if(ep_rank_loc==0) 838 { 839 buffer_size = *std::max_element(recv_plus_displs+ep_rank, recv_plus_displs+ep_rank+num_ep); 840 841 local_gather_recvbuf = new void*[datasize*buffer_size]; 842 } 843 844 // local gather to master 845 MPI_Gatherv_local2(sendbuf, count, datatype, local_gather_recvbuf, recvcounts+ep_rank-ep_rank_loc, displs+ep_rank-ep_rank_loc, comm); 846 847 //MPI_Gather 848 if(ep_rank_loc == 0) 849 { 850 int *mpi_recvcnt= new int[mpi_size]; 851 int *mpi_displs= new int[mpi_size]; 852 853 int buff_start = *std::min_element(displs+ep_rank, displs+ep_rank+num_ep);; 854 int buff_end = buffer_size; 855 856 int mpi_sendcnt = buff_end - buff_start; 857 858 859 ::MPI_Allgather(&mpi_sendcnt, 1, MPI_INT, mpi_recvcnt, 1, MPI_INT, static_cast< ::MPI_Comm>(comm.mpi_comm)); 860 ::MPI_Allgather(&buff_start, 1, MPI_INT, mpi_displs, 1, MPI_INT, static_cast< ::MPI_Comm>(comm.mpi_comm)); 861 862 863 ::MPI_Allgatherv((char*)local_gather_recvbuf + datasize*buff_start, mpi_sendcnt, static_cast< ::MPI_Datatype>(datatype), recvbuf, mpi_recvcnt, 864 mpi_displs, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Comm>(comm.mpi_comm)); 865 866 delete[] mpi_recvcnt; 867 delete[] mpi_displs; 868 } 869 870 int global_min_displs = *std::min_element(displs, displs+ep_size); 871 int global_recvcnt = *std::max_element(recv_plus_displs, recv_plus_displs+ep_size); 872 873 MPI_Bcast_local2(recvbuf+datasize*global_min_displs, global_recvcnt, datatype, comm); 874 875 if(ep_rank_loc==0) 876 { 877 if(datatype == MPI_INT) 878 { 879 delete[] static_cast<int*>(local_gather_recvbuf); 880 } 881 else if(datatype == MPI_FLOAT) 882 { 883 delete[] static_cast<float*>(local_gather_recvbuf); 884 } 885 else if(datatype == MPI_DOUBLE) 886 { 887 delete[] static_cast<double*>(local_gather_recvbuf); 888 } 889 else if(datatype == MPI_LONG) 890 { 891 delete[] static_cast<long*>(local_gather_recvbuf); 892 } 893 else if(datatype == MPI_UNSIGNED_LONG) 894 { 895 delete[] static_cast<unsigned long*>(local_gather_recvbuf); 896 } 897 else // if(datatype == MPI_CHAR) 898 { 899 delete[] static_cast<char*>(local_gather_recvbuf); 900 } 901 } 902 } 903 904 int MPI_Gatherv_special(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 905 MPI_Datatype recvtype, int root, MPI_Comm comm) 906 { 907 int ep_rank, ep_rank_loc, mpi_rank; 908 int ep_size, num_ep, mpi_size; 909 910 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 911 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 912 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 913 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 914 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 915 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 916 917 int root_mpi_rank = comm.rank_map->at(root).second; 918 int root_ep_loc = comm.rank_map->at(root).first; 919 920 ::MPI_Aint datasize, lb; 921 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(sendtype), &lb, &datasize); 922 923 void *local_gather_recvbuf; 924 int buffer_size; 925 926 int *local_displs = new int[num_ep]; 927 int *local_rvcnts = new int[num_ep]; 928 for(int i=0; i<num_ep; i++) local_rvcnts[i] = recvcounts[ep_rank-ep_rank_loc + i]; 929 local_displs[0] = 0; 930 for(int i=1; i<num_ep; i++) local_displs[i] = local_displs[i-1] + local_rvcnts[i-1]; 931 932 if(ep_rank_loc==0) 933 { 934 buffer_size = local_displs[num_ep-1] + recvcounts[ep_rank+num_ep-1]; 935 local_gather_recvbuf = new void*[datasize*buffer_size]; 936 } 937 938 // local gather to master 939 MPI_Gatherv_local2(sendbuf, sendcount, sendtype, local_gather_recvbuf, local_rvcnts, local_displs, comm); // all sendbuf gathered to master 940 941 int **mpi_recvcnts = new int*[num_ep]; 942 int **mpi_displs = new int*[num_ep]; 943 for(int i=0; i<num_ep; i++) 944 { 945 mpi_recvcnts[i] = new int[mpi_size]; 946 mpi_displs[i] = new int[mpi_size]; 947 for(int j=0; j<mpi_size; j++) 948 { 949 mpi_recvcnts[i][j] = recvcounts[j*num_ep + i]; 950 mpi_displs[i][j] = displs[j*num_ep + i]; 951 } 952 } 953 954 void *master_recvbuf; 955 if(ep_rank_loc == 0 && mpi_rank == root_mpi_rank && root_ep_loc != 0) master_recvbuf = new void*[sizeof(recvbuf)]; 956 957 if(ep_rank_loc == 0 && root_ep_loc == 0) // master in MPI_Allgatherv loop 958 for(int i=0; i<num_ep; i++) 959 { 960 ::MPI_Gatherv(local_gather_recvbuf + datasize*local_displs[i], recvcounts[ep_rank+i], static_cast< ::MPI_Datatype>(sendtype), recvbuf, mpi_recvcnts[i], mpi_displs[i], 961 static_cast< ::MPI_Datatype>(recvtype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 962 } 963 if(ep_rank_loc == 0 && root_ep_loc != 0) 964 for(int i=0; i<num_ep; i++) 965 { 966 ::MPI_Gatherv(local_gather_recvbuf + datasize*local_displs[i], recvcounts[ep_rank+i], static_cast< ::MPI_Datatype>(sendtype), master_recvbuf, mpi_recvcnts[i], mpi_displs[i], 967 static_cast< ::MPI_Datatype>(recvtype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 968 } 969 970 971 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) // root is not master, master send to root and root receive from master 972 { 973 for(int i=0; i<ep_size; i++) 974 innode_memcpy(0, master_recvbuf + datasize*displs[i], root_ep_loc, recvbuf + datasize*displs[i], recvcounts[i], sendtype, comm); 975 976 if(ep_rank_loc == 0) delete[] master_recvbuf; 977 } 978 979 980 delete[] local_displs; 981 delete[] local_rvcnts; 982 for(int i=0; i<num_ep; i++) { delete[] mpi_recvcnts[i]; 983 delete[] mpi_displs[i]; } 984 delete[] mpi_recvcnts; 985 delete[] mpi_displs; 986 if(ep_rank_loc==0) 987 { 988 if(sendtype == MPI_INT) 989 { 990 delete[] static_cast<int*>(local_gather_recvbuf); 991 } 992 else if(sendtype == MPI_FLOAT) 993 { 994 delete[] static_cast<float*>(local_gather_recvbuf); 995 } 996 else if(sendtype == MPI_DOUBLE) 997 { 998 delete[] static_cast<double*>(local_gather_recvbuf); 999 } 1000 else if(sendtype == MPI_LONG) 1001 { 1002 delete[] static_cast<long*>(local_gather_recvbuf); 1003 } 1004 else if(sendtype == MPI_UNSIGNED_LONG) 1005 { 1006 delete[] static_cast<unsigned long*>(local_gather_recvbuf); 1007 } 1008 else // if(sendtype == MPI_CHAR) 1009 { 1010 delete[] static_cast<char*>(local_gather_recvbuf); 1011 } 1012 } 1013 } 1014 1015 int MPI_Allgatherv_special(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 1016 MPI_Datatype recvtype, MPI_Comm comm) 1017 { 1018 int ep_rank, ep_rank_loc, mpi_rank; 1019 int ep_size, num_ep, mpi_size; 1020 1021 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 1022 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 1023 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 1024 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 1025 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 1026 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 1027 1028 1029 ::MPI_Aint datasize, lb; 1030 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(sendtype), &lb, &datasize); 1031 1032 void *local_gather_recvbuf; 1033 int buffer_size; 1034 1035 int *local_displs = new int[num_ep]; 1036 int *local_rvcnts = new int[num_ep]; 1037 for(int i=0; i<num_ep; i++) local_rvcnts[i] = recvcounts[ep_rank-ep_rank_loc + i]; 1038 local_displs[0] = 0; 1039 for(int i=1; i<num_ep; i++) local_displs[i] = local_displs[i-1] + local_rvcnts[i-1]; 1040 1041 if(ep_rank_loc==0) 1042 { 1043 buffer_size = local_displs[num_ep-1] + recvcounts[ep_rank+num_ep-1]; 1044 local_gather_recvbuf = new void*[datasize*buffer_size]; 1045 } 1046 1047 // local gather to master 1048 MPI_Gatherv_local2(sendbuf, sendcount, sendtype, local_gather_recvbuf, local_rvcnts, local_displs, comm); // all sendbuf gathered to master 1049 1050 int **mpi_recvcnts = new int*[num_ep]; 1051 int **mpi_displs = new int*[num_ep]; 1052 for(int i=0; i<num_ep; i++) 1053 { 1054 mpi_recvcnts[i] = new int[mpi_size]; 1055 mpi_displs[i] = new int[mpi_size]; 1056 for(int j=0; j<mpi_size; j++) 1057 { 1058 mpi_recvcnts[i][j] = recvcounts[j*num_ep + i]; 1059 mpi_displs[i][j] = displs[j*num_ep + i]; 1060 } 1061 } 1062 1063 if(ep_rank_loc == 0) // master in MPI_Allgatherv loop 1064 for(int i=0; i<num_ep; i++) 1065 { 1066 ::MPI_Allgatherv(local_gather_recvbuf + datasize*local_displs[i], recvcounts[ep_rank+i], static_cast< ::MPI_Datatype>(sendtype), recvbuf, mpi_recvcnts[i], mpi_displs[i], 1067 static_cast< ::MPI_Datatype>(recvtype), static_cast< ::MPI_Comm>(comm.mpi_comm)); 1068 } 1069 1070 for(int i=0; i<ep_size; i++) 1071 MPI_Bcast_local2(recvbuf + datasize*displs[i], recvcounts[i], recvtype, comm); 1072 1073 1074 delete[] local_displs; 1075 delete[] local_rvcnts; 1076 for(int i=0; i<num_ep; i++) { delete[] mpi_recvcnts[i]; 1077 delete[] mpi_displs[i]; } 1078 delete[] mpi_recvcnts; 1079 delete[] mpi_displs; 1080 if(ep_rank_loc==0) 1081 { 1082 if(sendtype == MPI_INT) 1083 { 1084 delete[] static_cast<int*>(local_gather_recvbuf); 1085 } 1086 else if(sendtype == MPI_FLOAT) 1087 { 1088 delete[] static_cast<float*>(local_gather_recvbuf); 1089 } 1090 else if(sendtype == MPI_DOUBLE) 1091 { 1092 delete[] static_cast<double*>(local_gather_recvbuf); 1093 } 1094 else if(sendtype == MPI_LONG) 1095 { 1096 delete[] static_cast<long*>(local_gather_recvbuf); 1097 } 1098 else if(sendtype == MPI_UNSIGNED_LONG) 1099 { 1100 delete[] static_cast<unsigned long*>(local_gather_recvbuf); 1101 } 1102 else // if(sendtype == MPI_CHAR) 1103 { 1104 delete[] static_cast<char*>(local_gather_recvbuf); 1105 } 1106 } 1107 } 1108 1109 188 1110 } -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_lib.cpp
r1287 r1289 63 63 64 64 65 int innode_memcpy(int sender, const void* sendbuf, int receiver, void* recvbuf, int count, MPI_Datatype datatype, MPI_Comm comm) 66 { 67 int ep_rank, ep_rank_loc, mpi_rank; 68 int ep_size, num_ep, mpi_size; 69 70 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 71 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 72 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 73 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 74 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 75 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 76 77 78 79 if(datatype == MPI_INT) 80 { 81 82 int* send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 83 int* recv_buf = static_cast<int*>(recvbuf); 84 int* buffer = comm.my_buffer->buf_int; 85 86 for(int j=0; j<count; j+=BUFFER_SIZE) 87 { 88 if(ep_rank_loc == sender) 89 { 90 #pragma omp critical (write_to_buffer) 91 { 92 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 93 } 94 #pragma omp flush 95 } 96 97 MPI_Barrier_local(comm); 98 99 100 if(ep_rank_loc == receiver) 101 { 102 #pragma omp flush 103 #pragma omp critical (read_from_buffer) 104 { 105 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 106 } 107 } 108 109 MPI_Barrier_local(comm); 110 } 111 } 112 else if(datatype == MPI_FLOAT) 113 { 114 115 float* send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 116 float* recv_buf = static_cast<float*>(recvbuf); 117 float* buffer = comm.my_buffer->buf_float; 118 119 for(int j=0; j<count; j+=BUFFER_SIZE) 120 { 121 if(ep_rank_loc == sender) 122 { 123 #pragma omp critical (write_to_buffer) 124 { 125 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 126 } 127 #pragma omp flush 128 } 129 130 MPI_Barrier_local(comm); 131 132 133 if(ep_rank_loc == receiver) 134 { 135 #pragma omp flush 136 #pragma omp critical (read_from_buffer) 137 { 138 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 139 } 140 } 141 142 MPI_Barrier_local(comm); 143 } 144 } 145 else if(datatype == MPI_DOUBLE) 146 { 147 148 149 double* send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 150 double* recv_buf = static_cast<double*>(recvbuf); 151 double* buffer = comm.my_buffer->buf_double; 152 153 for(int j=0; j<count; j+=BUFFER_SIZE) 154 { 155 if(ep_rank_loc == sender) 156 { 157 #pragma omp critical (write_to_buffer) 158 { 159 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 160 } 161 #pragma omp flush 162 } 163 164 MPI_Barrier_local(comm); 165 166 167 if(ep_rank_loc == receiver) 168 { 169 #pragma omp flush 170 #pragma omp critical (read_from_buffer) 171 { 172 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 173 } 174 } 175 176 MPI_Barrier_local(comm); 177 } 178 } 179 else if(datatype == MPI_LONG) 180 { 181 long* send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 182 long* recv_buf = static_cast<long*>(recvbuf); 183 long* buffer = comm.my_buffer->buf_long; 184 185 for(int j=0; j<count; j+=BUFFER_SIZE) 186 { 187 if(ep_rank_loc == sender) 188 { 189 #pragma omp critical (write_to_buffer) 190 { 191 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 192 } 193 #pragma omp flush 194 } 195 196 MPI_Barrier_local(comm); 197 198 199 if(ep_rank_loc == receiver) 200 { 201 #pragma omp flush 202 #pragma omp critical (read_from_buffer) 203 { 204 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 205 } 206 } 207 208 MPI_Barrier_local(comm); 209 } 210 } 211 else if(datatype == MPI_UNSIGNED_LONG) 212 { 213 unsigned long* send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 214 unsigned long* recv_buf = static_cast<unsigned long*>(recvbuf); 215 unsigned long* buffer = comm.my_buffer->buf_ulong; 216 217 for(int j=0; j<count; j+=BUFFER_SIZE) 218 { 219 if(ep_rank_loc == sender) 220 { 221 #pragma omp critical (write_to_buffer) 222 { 223 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 224 } 225 #pragma omp flush 226 } 227 228 MPI_Barrier_local(comm); 229 230 231 if(ep_rank_loc == receiver) 232 { 233 #pragma omp flush 234 #pragma omp critical (read_from_buffer) 235 { 236 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 237 } 238 } 239 240 MPI_Barrier_local(comm); 241 } 242 } 243 else if(datatype == MPI_CHAR) 244 { 245 char* send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 246 char* recv_buf = static_cast<char*>(recvbuf); 247 char* buffer = comm.my_buffer->buf_char; 248 249 for(int j=0; j<count; j+=BUFFER_SIZE) 250 { 251 if(ep_rank_loc == sender) 252 { 253 #pragma omp critical (write_to_buffer) 254 { 255 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 256 } 257 #pragma omp flush 258 } 259 260 MPI_Barrier_local(comm); 261 262 263 if(ep_rank_loc == receiver) 264 { 265 #pragma omp flush 266 #pragma omp critical (read_from_buffer) 267 { 268 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 269 } 270 } 271 272 MPI_Barrier_local(comm); 273 } 274 } 275 else 276 { 277 printf("datatype not supported!!\n"); 278 exit(1); 279 } 280 return 0; 281 } 282 283 65 284 int MPI_Get_count(const MPI_Status *status, MPI_Datatype datatype, int *count) 66 285 { 67 286 /* 287 ::MPI_Aint datasize, char_size, lb; 288 289 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 290 ::MPI_Type_get_extent(MPI_CHAR, &lb, &char_size); 291 292 *count = status->char_count / ( datasize/ char_size); 293 294 //printf("MPI_Get_count, status_count = %d\n", *count); 295 return 0; 296 */ 68 297 ::MPI_Status *mpi_status = static_cast< ::MPI_Status* >(status->mpi_status); 69 298 ::MPI_Datatype mpi_datatype = static_cast< ::MPI_Datatype >(datatype); -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_lib_collective.hpp
r1287 r1289 17 17 18 18 int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm); 19 20 19 int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); 21 20 … … 23 22 24 23 int MPI_Scan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); 25 26 24 int MPI_Exscan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); 27 25 … … 33 31 int MPI_Gatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 34 32 MPI_Datatype recvtype, int root, MPI_Comm comm); 33 int MPI_Gatherv_special(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 34 MPI_Datatype recvtype, int root, MPI_Comm comm); 35 35 int MPI_Allgatherv(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 36 36 MPI_Datatype recvtype, MPI_Comm comm); 37 37 38 int MPI_Allgatherv_special(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, const int recvcounts[], const int displs[], 39 MPI_Datatype recvtype, MPI_Comm comm); 40 38 41 39 42 int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm); 40 41 43 int MPI_Scatterv(const void *sendbuf, const int sendcounts[], const int displs[], MPI_Datatype sendtype, void *recvbuf, int recvcount, 42 44 MPI_Datatype recvtype, int root, MPI_Comm comm); -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_lib_fortran.hpp
r1287 r1289 6 6 namespace ep_lib 7 7 { 8 9 // #ifdef _intelmpi 10 11 // MPI_Fint MPI_Comm_c2f(MPI_Comm comm); 12 // MPI_Comm MPI_Comm_f2c(MPI_Fint comm); 13 14 // #elif _openmpi 15 16 // int MPI_Comm_c2f(MPI_Comm comm); 17 // ep_lib::MPI_Comm MPI_Comm_f2c(MPI_Fint comm); 18 19 // #endif 8 20 9 21 int EP_Comm_c2f(MPI_Comm comm); -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_lib_local.hpp
r1287 r1289 30 30 MPI_Datatype recvtype, int local_root, MPI_Comm comm); 31 31 32 33 34 int MPI_Reduce_local2 (const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); 35 int MPI_Reduce_local_int (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 36 int MPI_Reduce_local_float (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 37 int MPI_Reduce_local_double(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 38 int MPI_Reduce_local_long (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 39 int MPI_Reduce_local_ulong (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 40 int MPI_Reduce_local_char (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 41 42 43 int MPI_Scan_local2 (const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); 44 int MPI_Scan_local_int (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 45 int MPI_Scan_local_float (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 46 int MPI_Scan_local_double(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 47 int MPI_Scan_local_long (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 48 int MPI_Scan_local_ulong (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 49 int MPI_Scan_local_char (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 50 51 int MPI_Exscan_local2 (const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm); 52 int MPI_Exscan_local_int (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 53 int MPI_Exscan_local_float (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 54 int MPI_Exscan_local_double(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 55 int MPI_Exscan_local_long (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 56 int MPI_Exscan_local_ulong (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 57 int MPI_Exscan_local_char (const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm); 58 59 int MPI_Bcast_local2 (void *buffer, int count, MPI_Datatype datatype, MPI_Comm comm); 60 int MPI_Bcast_local_int (void *buffer, int count, MPI_Comm comm); 61 int MPI_Bcast_local_float (void *buffer, int count, MPI_Comm comm); 62 int MPI_Bcast_local_double(void *buffer, int count, MPI_Comm comm); 63 int MPI_Bcast_local_long (void *buffer, int count, MPI_Comm comm); 64 int MPI_Bcast_local_ulong (void *buffer, int count, MPI_Comm comm); 65 int MPI_Bcast_local_char (void *buffer, int count, MPI_Comm comm); 66 67 int MPI_Gather_local2 (const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, MPI_Comm comm); 68 int MPI_Gather_local_int (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 69 int MPI_Gather_local_float (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 70 int MPI_Gather_local_double(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 71 int MPI_Gather_local_long (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 72 int MPI_Gather_local_ulong (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 73 int MPI_Gather_local_char (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 74 75 76 int MPI_Gatherv_local2 (const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, 77 const int recvcounts[], const int displs[], MPI_Comm comm); 78 int MPI_Gatherv_local_int (const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm); 79 int MPI_Gatherv_local_float (const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm); 80 int MPI_Gatherv_local_double(const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm); 81 int MPI_Gatherv_local_long (const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm); 82 int MPI_Gatherv_local_ulong (const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm); 83 int MPI_Gatherv_local_char (const void *sendbuf, int count, void *recvbuf, const int recvcounts[], const int displs[], MPI_Comm comm); 84 85 int MPI_Scatter_local2 (const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, MPI_Comm comm); 86 int MPI_Scatter_local_int (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 87 int MPI_Scatter_local_float (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 88 int MPI_Scatter_local_double(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 89 int MPI_Scatter_local_long (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 90 int MPI_Scatter_local_ulong (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 91 int MPI_Scatter_local_char (const void *sendbuf, int count, void *recvbuf, MPI_Comm comm); 92 93 int MPI_Scatterv_local2 (const void *sendbuf, const int sendcounts[], const int displs[], MPI_Datatype datatype, void *recvbuf, MPI_Comm comm); 94 int MPI_Scatterv_local_int (const void *sendbuf, const int sendcounts[], const int displs[], void *recvbuf, MPI_Comm comm); 95 int MPI_Scatterv_local_float (const void *sendbuf, const int sendcounts[], const int displs[], void *recvbuf, MPI_Comm comm); 96 int MPI_Scatterv_local_double(const void *sendbuf, const int sendcounts[], const int displs[], void *recvbuf, MPI_Comm comm); 97 int MPI_Scatterv_local_long (const void *sendbuf, const int sendcounts[], const int displs[], void *recvbuf, MPI_Comm comm); 98 int MPI_Scatterv_local_ulong (const void *sendbuf, const int sendcounts[], const int displs[], void *recvbuf, MPI_Comm comm); 99 int MPI_Scatterv_local_char (const void *sendbuf, const int sendcounts[], const int displs[], void *recvbuf, MPI_Comm comm); 100 101 int innode_memcpy(int sender, const void* sendbuf, int receiver, void* recvbuf, int count, MPI_Datatype datatype, MPI_Comm comm); 102 32 103 int MPI_Barrier_local(MPI_Comm comm); 33 104 -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_merge.cpp
r1287 r1289 42 42 if(local_ep_rank == 0) 43 43 { 44 MPI_Status status[2]; 45 MPI_Request request[2]; 46 MPI_Isend(&local_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &request[0]); 47 MPI_Irecv(&remote_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &request[1]); 48 49 MPI_Waitall(2, request, status); 44 MPI_Status status; 45 MPI_Request req_s, req_r; 46 MPI_Isend(&local_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &req_s); 47 MPI_Irecv(&remote_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &req_r); 48 49 MPI_Wait(&req_s, &status); 50 MPI_Wait(&req_r, &status); 50 51 } 51 52 … … 53 54 MPI_Bcast(&remote_high, 1, MPI_INT, 0, *(inter_comm.ep_comm_ptr->intercomm->local_comm)); 54 55 56 // printf("%d, %d, %d, %d\n", local_ep_size, remote_ep_size, local_high, remote_high); 55 57 56 58 … … 84 86 if(intra_ep_rank_loc == 0) 85 87 { 86 ::MPI_Bcast(reorder, intra_ep_size, static_cast< ::MPI_Datatype> (MPI_INT), 0, static_cast< ::MPI_Comm>(newintracomm->mpi_comm));88 ::MPI_Bcast(reorder, intra_ep_size, MPI_INT, 0, static_cast< ::MPI_Comm>(newintracomm->mpi_comm)); 87 89 88 90 vector< pair<int, int> > tmp_rank_map(intra_ep_size); … … 128 130 Debug("intercomm_merge kernel\n"); 129 131 130 int ep_rank_loc; 131 int num_ep; 132 132 int ep_rank, ep_rank_loc, mpi_rank; 133 int ep_size, num_ep, mpi_size; 134 135 ep_rank = inter_comm.ep_comm_ptr->size_rank_info[0].first; 133 136 ep_rank_loc = inter_comm.ep_comm_ptr->size_rank_info[1].first; 137 mpi_rank = inter_comm.ep_comm_ptr->size_rank_info[2].first; 138 ep_size = inter_comm.ep_comm_ptr->size_rank_info[0].second; 134 139 num_ep = inter_comm.ep_comm_ptr->size_rank_info[1].second; 135 136 140 mpi_size = inter_comm.ep_comm_ptr->size_rank_info[2].second; 141 142 143 int local_ep_rank, local_ep_rank_loc, local_mpi_rank; 144 int local_ep_size, local_num_ep, local_mpi_size; 145 146 147 local_ep_rank = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[0].first; 148 local_ep_rank_loc = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[1].first; 149 local_mpi_rank = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[2].first; 150 local_ep_size = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[0].second; 151 local_num_ep = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[1].second; 152 local_mpi_size = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[2].second; 137 153 138 154 int remote_ep_size = inter_comm.ep_comm_ptr->intercomm->remote_rank_map->size(); 139 155 156 int local_high = high; 157 int remote_high; 140 158 141 159 MPI_Barrier(inter_comm); 160 161 // if(local_ep_rank == 0 && high == false) 162 // { 163 // MPI_Status status; 164 // MPI_Send(&local_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm); 165 // MPI_Recv(&remote_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &status); 166 // } 167 // 168 // if(local_ep_rank == 0 && high == true) 169 // { 170 // MPI_Status status; 171 // MPI_Recv(&remote_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &status); 172 // MPI_Send(&local_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm); 173 // } 174 175 if(local_ep_rank == 0) 176 { 177 MPI_Status status; 178 MPI_Request req_s, req_r; 179 MPI_Isend(&local_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &req_s); 180 MPI_Irecv(&remote_high, 1, MPI_INT, 0, inter_comm.ep_comm_ptr->intercomm->intercomm_tag, inter_comm, &req_r); 181 182 MPI_Wait(&req_s, &status); 183 MPI_Wait(&req_r, &status); 184 } 185 186 MPI_Bcast(&remote_high, 1, MPI_INT, 0, *(inter_comm.ep_comm_ptr->intercomm->local_comm)); 187 188 int intercomm_high; 189 if(ep_rank == 0) intercomm_high = local_high; 190 MPI_Bcast(&intercomm_high, 1, MPI_INT, 0, inter_comm); 191 192 //printf("remote_ep_size = %d, local_high = %d, remote_high = %d, intercomm_high = %d\n", remote_ep_size, local_high, remote_high, intercomm_high); 142 193 143 194 … … 150 201 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm>(inter_comm.ep_comm_ptr->intercomm->mpi_inter_comm); 151 202 152 ::MPI_Intercomm_merge(mpi_comm, high, &mpi_intracomm);203 ::MPI_Intercomm_merge(mpi_comm, intercomm_high, &mpi_intracomm); 153 204 MPI_Info info; 154 205 MPI_Comm_create_endpoints(mpi_intracomm, num_ep, info, ep_intracomm); … … 158 209 } 159 210 211 212 160 213 MPI_Barrier_local(inter_comm); 161 214 162 int inter_rank; 163 MPI_Comm_rank(inter_comm, &inter_rank); 164 165 int my_ep_rank = high? inter_rank+remote_ep_size : inter_rank; 166 int my_ep_rank_loc = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[1].first; 167 int my_num_ep_loc = inter_comm.ep_comm_ptr->intercomm->local_comm->ep_comm_ptr->size_rank_info[1].second; 168 int my_num_ep_total = inter_comm.ep_comm_ptr->comm_list->mem_bridge[0].ep_comm_ptr->size_rank_info[1].second; 169 int my_ep_size = inter_comm.ep_comm_ptr->comm_list->mem_bridge[0].ep_comm_ptr->size_rank_info[0].second; 170 171 int tmp_intra_ep_rank_loc = high?my_ep_rank_loc+my_num_ep_total-my_num_ep_loc: my_ep_rank_loc; 172 173 174 *newintracomm = inter_comm.ep_comm_ptr->comm_list->mem_bridge[tmp_intra_ep_rank_loc]; 175 176 int newintracomm_ep_rank = (*newintracomm).ep_comm_ptr->size_rank_info[0].first; 177 int newintracomm_ep_rank_loc = (*newintracomm).ep_comm_ptr->size_rank_info[1].first; 178 int newintracomm_mpi_rank = (*newintracomm).ep_comm_ptr->size_rank_info[2].first; 179 int newintracomm_ep_size = (*newintracomm).ep_comm_ptr->size_rank_info[0].second; 180 int newintracomm_num_ep = (*newintracomm).ep_comm_ptr->size_rank_info[1].second; 181 int newintracomm_mpi_size = (*newintracomm).ep_comm_ptr->size_rank_info[2].second; 182 183 184 int buf[3]; 185 buf[0] = my_ep_rank; 186 buf[1] = tmp_intra_ep_rank_loc; 187 buf[2] = newintracomm->ep_comm_ptr->size_rank_info[2].first; 188 189 // printf("my_ep_rank = %d, tmp_intra_ep_rank_loc = %d, mpi_rank = %d\n", my_ep_rank, tmp_intra_ep_rank_loc, newintracomm->ep_comm_ptr->size_rank_info[2].first); 190 191 int *rankmap_buf; 192 rankmap_buf = new int [3*my_ep_size]; 193 194 MPI_Allgather(buf, 3, MPI_INT, rankmap_buf, 3, MPI_INT, *newintracomm); 195 196 197 // printf(" ID = %d : rankmap_buf = (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d), (%d %d %d)\n", newintracomm_ep_rank, 198 // rankmap_buf[0], rankmap_buf[1], rankmap_buf[2], rankmap_buf[3], rankmap_buf[4], rankmap_buf[5], rankmap_buf[6], rankmap_buf[7], rankmap_buf[8], rankmap_buf[9], 199 // rankmap_buf[10], rankmap_buf[11], rankmap_buf[12], rankmap_buf[13], rankmap_buf[14], rankmap_buf[15], rankmap_buf[16], rankmap_buf[17], rankmap_buf[18], rankmap_buf[19], 200 // rankmap_buf[20], rankmap_buf[21], rankmap_buf[22], rankmap_buf[23], rankmap_buf[24], rankmap_buf[25], rankmap_buf[26], rankmap_buf[27], rankmap_buf[28], rankmap_buf[29], 201 // rankmap_buf[30], rankmap_buf[31], rankmap_buf[32], rankmap_buf[33], rankmap_buf[34], rankmap_buf[35], rankmap_buf[36], rankmap_buf[37], rankmap_buf[38], rankmap_buf[39], 202 // rankmap_buf[40], rankmap_buf[41], rankmap_buf[42], rankmap_buf[43], rankmap_buf[44], rankmap_buf[45], rankmap_buf[46], rankmap_buf[47]); 203 204 205 for(int i=0; i<newintracomm_ep_size; i++) 206 { 207 (*newintracomm).rank_map->at(rankmap_buf[3*i]).first = rankmap_buf[3*i+1]; 208 (*newintracomm).rank_map->at(rankmap_buf[3*i]).second = rankmap_buf[3*i+2]; 209 } 210 215 *newintracomm = inter_comm.ep_comm_ptr->comm_list->mem_bridge[ep_rank_loc]; 216 217 int my_ep_rank = local_high<remote_high? local_ep_rank: local_ep_rank+remote_ep_size; 218 219 int intra_ep_rank, intra_ep_rank_loc, intra_mpi_rank; 220 int intra_ep_size, intra_num_ep, intra_mpi_size; 221 222 intra_ep_rank = newintracomm->ep_comm_ptr->size_rank_info[0].first; 223 intra_ep_rank_loc = newintracomm->ep_comm_ptr->size_rank_info[1].first; 224 intra_mpi_rank = newintracomm->ep_comm_ptr->size_rank_info[2].first; 225 intra_ep_size = newintracomm->ep_comm_ptr->size_rank_info[0].second; 226 intra_num_ep = newintracomm->ep_comm_ptr->size_rank_info[1].second; 227 intra_mpi_size = newintracomm->ep_comm_ptr->size_rank_info[2].second; 228 229 230 231 MPI_Barrier_local(*newintracomm); 232 233 234 int *reorder; 235 if(intra_ep_rank_loc == 0) 236 { 237 reorder = new int[intra_ep_size]; 238 } 239 240 241 242 MPI_Gather(&my_ep_rank, 1, MPI_INT, reorder, 1, MPI_INT, 0, *newintracomm); 243 if(intra_ep_rank_loc == 0) 244 { 245 246 ::MPI_Bcast(reorder, intra_ep_size, MPI_INT, 0, static_cast< ::MPI_Comm>(newintracomm->mpi_comm)); 247 248 vector< pair<int, int> > tmp_rank_map(intra_ep_size); 249 250 251 for(int i=0; i<intra_ep_size; i++) 252 { 253 tmp_rank_map[reorder[i]] = newintracomm->rank_map->at(i) ; 254 } 255 256 newintracomm->rank_map->swap(tmp_rank_map); 257 258 tmp_rank_map.clear(); 259 } 260 261 MPI_Barrier_local(*newintracomm); 211 262 212 263 (*newintracomm).ep_comm_ptr->size_rank_info[0].first = my_ep_rank; 213 (*newintracomm).ep_comm_ptr->size_rank_info[1].first = tmp_intra_ep_rank_loc; 214 215 216 delete[] rankmap_buf; 217 218 264 265 266 if(intra_ep_rank_loc == 0) 267 { 268 delete[] reorder; 269 270 } 271 272 /* 273 if(intra_ep_rank == 0) 274 { 275 for(int i=0; i<intra_ep_size; i++) 276 { 277 printf("intra rank_map[%d] = (%d, %d)\n", i, newintracomm->rank_map->at(i).first, newintracomm->rank_map->at(i).second); 278 } 279 } 280 */ 219 281 return MPI_SUCCESS; 220 282 -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp
r1287 r1289 9 9 #include <mpi.h> 10 10 #include "ep_declaration.hpp" 11 #include "ep_mpi.hpp"12 11 13 12 using namespace std; … … 21 20 int Message_Check(MPI_Comm comm) 22 21 { 22 int myRank; 23 MPI_Comm_rank(comm, &myRank); 24 23 25 if(!comm.is_ep) return 0; 24 26 … … 36 38 { 37 39 Debug("Message probing for intracomm\n"); 38 39 40 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm); 40 41 #ifdef _openmpi 41 42 #pragma omp critical (_mpi_call) 42 43 { 43 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);44 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status); 44 45 if(flag) 45 46 { … … 47 48 mpi_source = status.MPI_SOURCE; 48 49 int tag = status.MPI_TAG; 49 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);50 ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status); 50 51 51 52 } 52 53 } 53 54 #elif _intelmpi 54 ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &message, &status); 55 #pragma omp critical (_mpi_call) 56 ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status); 55 57 #endif 56 58 … … 66 68 67 69 msg_block->ep_src = get_ep_rank(comm, src_loc, src_mpi); 70 int dest_mpi = comm.ep_comm_ptr->size_rank_info[2].first; 71 int ep_dest = get_ep_rank(comm, dest_loc, dest_mpi); 68 72 msg_block->mpi_status = new ::MPI_Status(status); 69 73 … … 75 79 { 76 80 #pragma omp flush 77 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 81 ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 82 78 83 #pragma omp flush 79 84 } … … 105 110 { 106 111 Debug("Message probing for intracomm\n"); 107 112 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.ep_comm_ptr->intercomm->mpi_inter_comm); // => mpi_intercomm 113 108 114 #ifdef _openmpi 109 115 #pragma omp critical (_mpi_call) 110 116 { 111 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);117 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status); 112 118 if(flag) 113 119 { … … 115 121 mpi_source = status.MPI_SOURCE; 116 122 int tag = status.MPI_TAG; 117 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status);123 ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status); 118 124 119 125 } 120 126 } 121 127 #elif _intelmpi 122 ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &message, &status); 128 #pragma omp critical (_mpi_call) 129 ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status); 123 130 #endif 124 131 … … 146 153 { 147 154 #pragma omp flush 148 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);155 ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 149 156 #pragma omp flush 150 157 } … … 160 167 { 161 168 Debug("Message probing for intracomm\n"); 162 169 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm); 163 170 #ifdef _openmpi 164 171 #pragma omp critical (_mpi_call) 165 172 { 166 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);173 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &status); 167 174 if(flag) 168 175 { … … 170 177 mpi_source = status.MPI_SOURCE; 171 178 int tag = status.MPI_TAG; 172 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);179 ::MPI_Mprobe(mpi_source, tag, mpi_comm, &message, &status); 173 180 174 181 } 175 182 } 176 183 #elif _intelmpi 177 ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &message, &status); 184 #pragma omp critical (_mpi_call) 185 ::MPI_Improbe(MPI_ANY_SOURCE, MPI_ANY_TAG, mpi_comm, &flag, &message, &status); 178 186 #endif 179 187 … … 188 196 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 189 197 int src_mpi = status.MPI_SOURCE; 198 int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first; 190 199 191 200 msg_block->ep_src = get_ep_rank_intercomm(comm, src_loc, src_mpi); … … 200 209 { 201 210 #pragma omp flush 202 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);211 ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 203 212 #pragma omp flush 204 213 } -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_reduce.cpp
r1287 r1289 9 9 #include <mpi.h> 10 10 #include "ep_declaration.hpp" 11 #include "ep_mpi.hpp"12 11 13 12 using namespace std; … … 28 27 } 29 28 30 template<typename T> 31 void reduce_max(const T * buffer, T* recvbuf, int count) 32 { 33 transform(buffer, buffer+count, recvbuf, recvbuf, max_op<T>); 34 } 35 36 template<typename T> 37 void reduce_min(const T * buffer, T* recvbuf, int count) 38 { 39 transform(buffer, buffer+count, recvbuf, recvbuf, min_op<T>); 40 } 41 42 template<typename T> 43 void reduce_sum(const T * buffer, T* recvbuf, int count) 44 { 45 transform(buffer, buffer+count, recvbuf, recvbuf, std::plus<T>()); 46 } 47 48 int MPI_Reduce_local(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int local_root, MPI_Comm comm) 49 { 50 assert(valid_type(datatype)); 51 assert(valid_op(op)); 52 53 ::MPI_Aint datasize, lb; 54 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 55 56 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 57 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 58 59 #pragma omp critical (_reduce) 60 comm.my_buffer->void_buffer[ep_rank_loc] = const_cast< void* >(sendbuf); 61 62 MPI_Barrier_local(comm); 63 64 if(ep_rank_loc == local_root) 65 { 66 memcpy(recvbuf, comm.my_buffer->void_buffer[0], datasize * count); 67 68 if(op == MPI_MAX) 69 { 70 if(datasize == sizeof(int)) 29 30 int MPI_Reduce_local2(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 31 { 32 if(datatype == MPI_INT) 33 { 34 Debug("datatype is INT\n"); 35 return MPI_Reduce_local_int(sendbuf, recvbuf, count, op, comm); 36 } 37 else if(datatype == MPI_FLOAT) 38 { 39 Debug("datatype is FLOAT\n"); 40 return MPI_Reduce_local_float(sendbuf, recvbuf, count, op, comm); 41 } 42 else if(datatype == MPI_DOUBLE) 43 { 44 Debug("datatype is DOUBLE\n"); 45 return MPI_Reduce_local_double(sendbuf, recvbuf, count, op, comm); 46 } 47 else if(datatype == MPI_LONG) 48 { 49 Debug("datatype is DOUBLE\n"); 50 return MPI_Reduce_local_long(sendbuf, recvbuf, count, op, comm); 51 } 52 else if(datatype == MPI_UNSIGNED_LONG) 53 { 54 Debug("datatype is DOUBLE\n"); 55 return MPI_Reduce_local_ulong(sendbuf, recvbuf, count, op, comm); 56 } 57 else if(datatype == MPI_CHAR) 58 { 59 Debug("datatype is DOUBLE\n"); 60 return MPI_Reduce_local_char(sendbuf, recvbuf, count, op, comm); 61 } 62 else 63 { 64 printf("MPI_Reduce Datatype not supported!\n"); 65 exit(0); 66 } 67 } 68 69 70 int MPI_Reduce_local_int(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 71 { 72 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 73 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 74 75 int *buffer = comm.my_buffer->buf_int; 76 int *send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 77 int *recv_buf = static_cast<int*>(const_cast<void*>(recvbuf)); 78 79 for(int j=0; j<count; j+=BUFFER_SIZE) 80 { 81 if( 0 == my_rank ) 82 { 83 #pragma omp critical (write_to_buffer) 84 copy(send_buf+j, send_buf+j + min(BUFFER_SIZE, count-j), buffer); 85 #pragma omp flush 86 } 87 88 MPI_Barrier_local(comm); 89 90 if(my_rank !=0 ) 91 { 92 #pragma omp critical (write_to_buffer) 71 93 { 72 for(int i=1; i<num_ep; i++) 73 reduce_max<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 94 #pragma omp flush 95 if(op == MPI_SUM) 96 { 97 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<int>()); 98 } 99 100 else if (op == MPI_MAX) 101 { 102 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<int>); 103 } 104 105 else if (op == MPI_MIN) 106 { 107 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<int>); 108 } 109 110 else 111 { 112 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 113 exit(1); 114 } 115 #pragma omp flush 74 116 } 75 76 else if(datasize == sizeof(float)) 117 } 118 119 MPI_Barrier_local(comm); 120 121 if(my_rank == 0) 122 { 123 #pragma omp flush 124 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 125 } 126 MPI_Barrier_local(comm); 127 } 128 } 129 130 131 int MPI_Reduce_local_float(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 132 { 133 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 134 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 135 136 float *buffer = comm.my_buffer->buf_float; 137 float *send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 138 float *recv_buf = static_cast<float*>(const_cast<void*>(recvbuf)); 139 140 for(int j=0; j<count; j+=BUFFER_SIZE) 141 { 142 if( 0 == my_rank ) 143 { 144 #pragma omp critical (write_to_buffer) 145 copy(send_buf+j, send_buf+j + min(BUFFER_SIZE, count-j), buffer); 146 #pragma omp flush 147 } 148 149 MPI_Barrier_local(comm); 150 151 if(my_rank !=0 ) 152 { 153 #pragma omp critical (write_to_buffer) 77 154 { 78 for(int i=1; i<num_ep; i++) 79 reduce_max<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 155 #pragma omp flush 156 157 if(op == MPI_SUM) 158 { 159 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<float>()); 160 } 161 162 else if (op == MPI_MAX) 163 { 164 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<float>); 165 } 166 167 else if (op == MPI_MIN) 168 { 169 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<float>); 170 } 171 172 else 173 { 174 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 175 exit(1); 176 } 177 #pragma omp flush 80 178 } 81 82 else if(datasize == sizeof(double)) 179 } 180 181 MPI_Barrier_local(comm); 182 183 if(my_rank == 0) 184 { 185 #pragma omp flush 186 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 187 } 188 MPI_Barrier_local(comm); 189 } 190 } 191 192 int MPI_Reduce_local_double(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 193 { 194 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 195 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 196 197 double *buffer = comm.my_buffer->buf_double; 198 double *send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 199 double *recv_buf = static_cast<double*>(const_cast<void*>(recvbuf)); 200 201 for(int j=0; j<count; j+=BUFFER_SIZE) 202 { 203 if( 0 == my_rank ) 204 { 205 #pragma omp critical (write_to_buffer) 206 copy(send_buf+j, send_buf+j + min(BUFFER_SIZE, count-j), buffer); 207 #pragma omp flush 208 } 209 210 MPI_Barrier_local(comm); 211 212 if(my_rank !=0 ) 213 { 214 #pragma omp critical (write_to_buffer) 83 215 { 84 for(int i=1; i<num_ep; i++) 85 reduce_max<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 216 #pragma omp flush 217 218 219 if(op == MPI_SUM) 220 { 221 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<double>()); 222 } 223 224 else if (op == MPI_MAX) 225 { 226 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<double>); 227 } 228 229 230 else if (op == MPI_MIN) 231 { 232 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<double>); 233 } 234 235 else 236 { 237 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 238 exit(1); 239 } 240 #pragma omp flush 86 241 } 87 88 else if(datasize == sizeof(char)) 242 } 243 244 MPI_Barrier_local(comm); 245 246 if(my_rank == 0) 247 { 248 #pragma omp flush 249 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 250 } 251 MPI_Barrier_local(comm); 252 } 253 } 254 255 int MPI_Reduce_local_long(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 256 { 257 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 258 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 259 260 long *buffer = comm.my_buffer->buf_long; 261 long *send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 262 long *recv_buf = static_cast<long*>(const_cast<void*>(recvbuf)); 263 264 for(int j=0; j<count; j+=BUFFER_SIZE) 265 { 266 if( 0 == my_rank ) 267 { 268 #pragma omp critical (write_to_buffer) 269 copy(send_buf+j, send_buf+j + min(BUFFER_SIZE, count-j), buffer); 270 #pragma omp flush 271 } 272 273 MPI_Barrier_local(comm); 274 275 if(my_rank !=0 ) 276 { 277 #pragma omp critical (write_to_buffer) 89 278 { 90 for(int i=1; i<num_ep; i++) 91 reduce_max<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 279 #pragma omp flush 280 281 282 if(op == MPI_SUM) 283 { 284 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<long>()); 285 } 286 287 else if (op == MPI_MAX) 288 { 289 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<long>); 290 } 291 292 293 else if (op == MPI_MIN) 294 { 295 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<long>); 296 } 297 298 else 299 { 300 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 301 exit(1); 302 } 303 #pragma omp flush 92 304 } 93 94 else if(datasize == sizeof(long)) 305 } 306 307 MPI_Barrier_local(comm); 308 309 if(my_rank == 0) 310 { 311 #pragma omp flush 312 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 313 } 314 MPI_Barrier_local(comm); 315 } 316 } 317 318 int MPI_Reduce_local_ulong(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 319 { 320 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 321 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 322 323 unsigned long *buffer = comm.my_buffer->buf_ulong; 324 unsigned long *send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 325 unsigned long *recv_buf = static_cast<unsigned long*>(const_cast<void*>(recvbuf)); 326 327 for(int j=0; j<count; j+=BUFFER_SIZE) 328 { 329 if( 0 == my_rank ) 330 { 331 #pragma omp critical (write_to_buffer) 332 copy(send_buf+j, send_buf+j + min(BUFFER_SIZE, count-j), buffer); 333 #pragma omp flush 334 } 335 336 MPI_Barrier_local(comm); 337 338 if(my_rank !=0 ) 339 { 340 #pragma omp critical (write_to_buffer) 95 341 { 96 for(int i=1; i<num_ep; i++) 97 reduce_max<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 342 #pragma omp flush 343 344 345 if(op == MPI_SUM) 346 { 347 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<unsigned long>()); 348 } 349 350 else if (op == MPI_MAX) 351 { 352 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<unsigned long>); 353 } 354 355 356 else if (op == MPI_MIN) 357 { 358 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<unsigned long>); 359 } 360 361 else 362 { 363 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 364 exit(1); 365 } 366 #pragma omp flush 98 367 } 99 100 else if(datasize == sizeof(unsigned long)) 368 } 369 370 MPI_Barrier_local(comm); 371 372 if(my_rank == 0) 373 { 374 #pragma omp flush 375 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 376 } 377 MPI_Barrier_local(comm); 378 } 379 } 380 381 int MPI_Reduce_local_char(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 382 { 383 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 384 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 385 386 char *buffer = comm.my_buffer->buf_char; 387 char *send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 388 char *recv_buf = static_cast<char*>(const_cast<void*>(recvbuf)); 389 390 for(int j=0; j<count; j+=BUFFER_SIZE) 391 { 392 if( 0 == my_rank ) 393 { 394 #pragma omp critical (write_to_buffer) 395 copy(send_buf+j, send_buf+j + min(BUFFER_SIZE, count-j), buffer); 396 #pragma omp flush 397 } 398 399 MPI_Barrier_local(comm); 400 401 if(my_rank !=0 ) 402 { 403 #pragma omp critical (write_to_buffer) 101 404 { 102 for(int i=1; i<num_ep; i++) 103 reduce_max<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 405 #pragma omp flush 406 407 408 if(op == MPI_SUM) 409 { 410 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<char>()); 411 } 412 413 else if (op == MPI_MAX) 414 { 415 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<char>); 416 } 417 418 419 else if (op == MPI_MIN) 420 { 421 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<char>); 422 } 423 424 else 425 { 426 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 427 exit(1); 428 } 429 #pragma omp flush 104 430 } 105 106 else printf("datatype Error\n"); 107 108 } 109 110 if(op == MPI_MIN) 111 { 112 if(datasize == sizeof(int)) 113 { 114 for(int i=1; i<num_ep; i++) 115 reduce_min<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 116 } 117 118 else if(datasize == sizeof(float)) 119 { 120 for(int i=1; i<num_ep; i++) 121 reduce_min<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 122 } 123 124 else if(datasize == sizeof(double)) 125 { 126 for(int i=1; i<num_ep; i++) 127 reduce_min<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 128 } 129 130 else if(datasize == sizeof(char)) 131 { 132 for(int i=1; i<num_ep; i++) 133 reduce_min<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 134 } 135 136 else if(datasize == sizeof(long)) 137 { 138 for(int i=1; i<num_ep; i++) 139 reduce_min<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 140 } 141 142 else if(datasize == sizeof(unsigned long)) 143 { 144 for(int i=1; i<num_ep; i++) 145 reduce_min<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 146 } 147 148 else printf("datatype Error\n"); 149 150 } 151 152 153 if(op == MPI_SUM) 154 { 155 if(datasize == sizeof(int)) 156 { 157 for(int i=1; i<num_ep; i++) 158 reduce_sum<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 159 } 160 161 else if(datasize == sizeof(float)) 162 { 163 for(int i=1; i<num_ep; i++) 164 reduce_sum<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 165 } 166 167 else if(datasize == sizeof(double)) 168 { 169 for(int i=1; i<num_ep; i++) 170 reduce_sum<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 171 } 172 173 else if(datasize == sizeof(char)) 174 { 175 for(int i=1; i<num_ep; i++) 176 reduce_sum<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 177 } 178 179 else if(datasize == sizeof(long)) 180 { 181 for(int i=1; i<num_ep; i++) 182 reduce_sum<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 183 } 184 185 else if(datasize == sizeof(unsigned long)) 186 { 187 for(int i=1; i<num_ep; i++) 188 reduce_sum<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 189 } 190 191 else printf("datatype Error\n"); 192 193 } 194 } 195 196 MPI_Barrier_local(comm); 197 431 } 432 433 MPI_Barrier_local(comm); 434 435 if(my_rank == 0) 436 { 437 #pragma omp flush 438 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 439 } 440 MPI_Barrier_local(comm); 441 } 198 442 } 199 443 … … 201 445 int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm) 202 446 { 203 204 447 if(!comm.is_ep && comm.mpi_comm) 205 448 { 206 return ::MPI_Reduce(sendbuf, recvbuf, count, to_mpi_type(datatype), to_mpi_op(op), root, to_mpi_comm(comm.mpi_comm)); 207 } 208 209 210 211 int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 212 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 213 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 214 int ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 215 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 216 int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 449 ::MPI_Reduce(sendbuf, recvbuf, count, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), root, 450 static_cast< ::MPI_Comm>(comm.mpi_comm)); 451 return 0; 452 } 453 454 455 if(!comm.mpi_comm) return 0; 217 456 218 457 int root_mpi_rank = comm.rank_map->at(root).second; 219 458 int root_ep_loc = comm.rank_map->at(root).first; 220 459 460 int ep_rank, ep_rank_loc, mpi_rank; 461 int ep_size, num_ep, mpi_size; 462 463 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 464 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 465 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 466 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 467 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 468 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 469 470 471 ::MPI_Aint recvsize, lb; 472 473 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &recvsize); 474 475 void *local_recvbuf; 476 if(ep_rank_loc==0) 477 { 478 local_recvbuf = new void*[recvsize*count]; 479 } 480 481 MPI_Reduce_local2(sendbuf, local_recvbuf, count, datatype, op, comm); 482 483 484 if(ep_rank_loc==0) 485 { 486 ::MPI_Reduce(local_recvbuf, recvbuf, count, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 487 } 488 489 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) // root is not master, master send to root and root receive from master 490 { 491 innode_memcpy(0, recvbuf, root_ep_loc, recvbuf, count, datatype, comm); 492 } 493 494 if(ep_rank_loc==0) 495 { 496 if(datatype == MPI_INT) delete[] static_cast<int*>(local_recvbuf); 497 else if(datatype == MPI_FLOAT) delete[] static_cast<float*>(local_recvbuf); 498 else if(datatype == MPI_DOUBLE) delete[] static_cast<double*>(local_recvbuf); 499 else if(datatype == MPI_LONG) delete[] static_cast<long*>(local_recvbuf); 500 else if(datatype == MPI_UNSIGNED_LONG) delete[] static_cast<unsigned long*>(local_recvbuf); 501 else delete[] static_cast<char*>(local_recvbuf); 502 } 503 504 Message_Check(comm); 505 506 return 0; 507 } 508 509 510 511 512 int MPI_Allreduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 513 { 514 if(!comm.is_ep && comm.mpi_comm) 515 { 516 ::MPI_Allreduce(sendbuf, recvbuf, count, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), 517 static_cast< ::MPI_Comm>(comm.mpi_comm)); 518 return 0; 519 } 520 521 if(!comm.mpi_comm) return 0; 522 523 524 int ep_rank, ep_rank_loc, mpi_rank; 525 int ep_size, num_ep, mpi_size; 526 527 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 528 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 529 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 530 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 531 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 532 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 533 534 535 ::MPI_Aint recvsize, lb; 536 537 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &recvsize); 538 539 void *local_recvbuf; 540 if(ep_rank_loc==0) 541 { 542 local_recvbuf = new void*[recvsize*count]; 543 } 544 545 MPI_Reduce_local2(sendbuf, local_recvbuf, count, datatype, op, comm); 546 547 548 if(ep_rank_loc==0) 549 { 550 ::MPI_Allreduce(local_recvbuf, recvbuf, count, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), static_cast< ::MPI_Comm>(comm.mpi_comm)); 551 } 552 553 MPI_Bcast_local2(recvbuf, count, datatype, comm); 554 555 if(ep_rank_loc==0) 556 { 557 if(datatype == MPI_INT) delete[] static_cast<int*>(local_recvbuf); 558 else if(datatype == MPI_FLOAT) delete[] static_cast<float*>(local_recvbuf); 559 else if(datatype == MPI_DOUBLE) delete[] static_cast<double*>(local_recvbuf); 560 else if(datatype == MPI_LONG) delete[] static_cast<long*>(local_recvbuf); 561 else if(datatype == MPI_UNSIGNED_LONG) delete[] static_cast<unsigned long*>(local_recvbuf); 562 else delete[] static_cast<char*>(local_recvbuf); 563 } 564 565 Message_Check(comm); 566 567 return 0; 568 } 569 570 571 int MPI_Reduce_scatter(const void *sendbuf, void *recvbuf, const int recvcounts[], MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 572 { 573 574 if(!comm.is_ep && comm.mpi_comm) 575 { 576 ::MPI_Reduce_scatter(sendbuf, recvbuf, recvcounts, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), 577 static_cast< ::MPI_Comm>(comm.mpi_comm)); 578 return 0; 579 } 580 581 if(!comm.mpi_comm) return 0; 582 583 int ep_rank, ep_rank_loc, mpi_rank; 584 int ep_size, num_ep, mpi_size; 585 586 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 587 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 588 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 589 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 590 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 591 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 592 593 void* local_buf; 594 void* local_buf2; 595 int local_buf_size = accumulate(recvcounts, recvcounts+ep_size, 0); 596 int local_buf2_size = accumulate(recvcounts+ep_rank-ep_rank_loc, recvcounts+ep_rank-ep_rank_loc+num_ep, 0); 597 221 598 ::MPI_Aint datasize, lb; 222 599 223 600 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 224 601 225 bool is_master = (ep_rank_loc==0 && mpi_rank != root_mpi_rank ) || ep_rank == root; 226 bool is_root = ep_rank == root; 227 228 void* local_recvbuf; 229 230 if(is_master) 231 { 232 local_recvbuf = new void*[datasize * count]; 233 } 234 235 if(mpi_rank == root_mpi_rank) MPI_Reduce_local(sendbuf, local_recvbuf, count, datatype, op, root_ep_loc, comm); 236 else MPI_Reduce_local(sendbuf, local_recvbuf, count, datatype, op, 0, comm); 237 238 239 240 if(is_master) 241 { 242 ::MPI_Reduce(local_recvbuf, recvbuf, count, to_mpi_type(datatype), to_mpi_op(op), root_mpi_rank, to_mpi_comm(comm.mpi_comm)); 243 244 } 245 246 if(is_master) 247 { 248 delete[] local_recvbuf; 249 } 250 251 MPI_Barrier_local(comm); 252 } 253 254 602 if(ep_rank_loc == 0) 603 { 604 local_buf = new void*[local_buf_size*datasize]; 605 local_buf2 = new void*[local_buf2_size*datasize]; 606 } 607 MPI_Reduce_local2(sendbuf, local_buf, local_buf_size, MPI_INT, op, comm); 608 609 610 if(ep_rank_loc == 0) 611 { 612 int local_recvcnt[mpi_size]; 613 for(int i=0; i<mpi_size; i++) 614 { 615 local_recvcnt[i] = accumulate(recvcounts+ep_rank, recvcounts+ep_rank+num_ep, 0); 616 } 617 618 ::MPI_Reduce_scatter(local_buf, local_buf2, local_recvcnt, static_cast< ::MPI_Datatype>(datatype), 619 static_cast< ::MPI_Op>(op), static_cast< ::MPI_Comm>(comm.mpi_comm)); 620 } 621 622 623 int displs[num_ep]; 624 displs[0] = 0; 625 for(int i=1; i<num_ep; i++) 626 { 627 displs[i] = displs[i-1] + recvcounts[ep_rank-ep_rank_loc+i-1]; 628 } 629 630 MPI_Scatterv_local2(local_buf2, recvcounts+ep_rank-ep_rank_loc, displs, datatype, recvbuf, comm); 631 632 if(ep_rank_loc == 0) 633 { 634 if(datatype == MPI_INT) 635 { 636 delete[] static_cast<int*>(local_buf); 637 delete[] static_cast<int*>(local_buf2); 638 } 639 else if(datatype == MPI_FLOAT) 640 { 641 delete[] static_cast<float*>(local_buf); 642 delete[] static_cast<float*>(local_buf2); 643 } 644 else if(datatype == MPI_DOUBLE) 645 { 646 delete[] static_cast<double*>(local_buf); 647 delete[] static_cast<double*>(local_buf2); 648 } 649 else if(datatype == MPI_LONG) 650 { 651 delete[] static_cast<long*>(local_buf); 652 delete[] static_cast<long*>(local_buf2); 653 } 654 else if(datatype == MPI_UNSIGNED_LONG) 655 { 656 delete[] static_cast<unsigned long*>(local_buf); 657 delete[] static_cast<unsigned long*>(local_buf2); 658 } 659 else // if(datatype == MPI_DOUBLE) 660 { 661 delete[] static_cast<char*>(local_buf); 662 delete[] static_cast<char*>(local_buf2); 663 } 664 } 665 666 Message_Check(comm); 667 return 0; 668 } 255 669 } 256 670 -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_scan.cpp
r1287 r1289 9 9 #include <mpi.h> 10 10 #include "ep_declaration.hpp" 11 #include "ep_mpi.hpp"12 11 13 12 using namespace std; … … 27 26 } 28 27 29 template<typename T> 30 void reduce_max(const T * buffer, T* recvbuf, int count) 31 { 32 transform(buffer, buffer+count, recvbuf, recvbuf, max_op<T>); 33 } 34 35 template<typename T> 36 void reduce_min(const T * buffer, T* recvbuf, int count) 37 { 38 transform(buffer, buffer+count, recvbuf, recvbuf, min_op<T>); 39 } 40 41 template<typename T> 42 void reduce_sum(const T * buffer, T* recvbuf, int count) 43 { 44 transform(buffer, buffer+count, recvbuf, recvbuf, std::plus<T>()); 45 } 46 47 48 int MPI_Scan_local(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 49 { 50 valid_op(op); 51 52 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 53 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 54 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 55 28 29 int MPI_Scan_local2(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 30 { 31 if(datatype == MPI_INT) 32 { 33 return MPI_Scan_local_int(sendbuf, recvbuf, count, op, comm); 34 } 35 else if(datatype == MPI_FLOAT) 36 { 37 return MPI_Scan_local_float(sendbuf, recvbuf, count, op, comm); 38 } 39 else if(datatype == MPI_DOUBLE) 40 { 41 return MPI_Scan_local_double(sendbuf, recvbuf, count, op, comm); 42 } 43 else if(datatype == MPI_LONG) 44 { 45 return MPI_Scan_local_long(sendbuf, recvbuf, count, op, comm); 46 } 47 else if(datatype == MPI_UNSIGNED_LONG) 48 { 49 return MPI_Scan_local_ulong(sendbuf, recvbuf, count, op, comm); 50 } 51 else if(datatype == MPI_CHAR) 52 { 53 return MPI_Scan_local_char(sendbuf, recvbuf, count, op, comm); 54 } 55 else 56 { 57 printf("MPI_Scan Datatype not supported!\n"); 58 exit(0); 59 } 60 61 } 62 63 64 65 66 int MPI_Scan_local_int(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 67 { 68 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 69 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 70 71 int *buffer = comm.my_buffer->buf_int; 72 int *send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 73 int *recv_buf = static_cast<int*>(recvbuf); 74 75 for(int j=0; j<count; j+=BUFFER_SIZE) 76 { 77 if(my_rank == 0) 78 { 79 80 #pragma omp critical (write_to_buffer) 81 { 82 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 83 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), recv_buf+j); 84 #pragma omp flush 85 } 86 } 87 88 MPI_Barrier_local(comm); 89 90 for(int k=1; k<num_ep; k++) 91 { 92 #pragma omp critical (write_to_buffer) 93 { 94 if(my_rank == k) 95 { 96 #pragma omp flush 97 if(op == MPI_SUM) 98 { 99 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<int>()); 100 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 101 } 102 else if(op == MPI_MAX) 103 { 104 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<int>); 105 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 106 } 107 else if(op == MPI_MIN) 108 { 109 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<int>); 110 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 111 } 112 else 113 { 114 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 115 exit(1); 116 } 117 #pragma omp flush 118 } 119 } 120 121 MPI_Barrier_local(comm); 122 } 123 } 124 125 } 126 127 int MPI_Scan_local_float(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 128 { 129 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 130 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 131 132 float *buffer = comm.my_buffer->buf_float; 133 float *send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 134 float *recv_buf = static_cast<float*>(recvbuf); 135 136 for(int j=0; j<count; j+=BUFFER_SIZE) 137 { 138 if(my_rank == 0) 139 { 140 141 #pragma omp critical (write_to_buffer) 142 { 143 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 144 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), recv_buf+j); 145 #pragma omp flush 146 } 147 } 148 149 MPI_Barrier_local(comm); 150 151 for(int k=1; k<num_ep; k++) 152 { 153 #pragma omp critical (write_to_buffer) 154 { 155 if(my_rank == k) 156 { 157 #pragma omp flush 158 if(op == MPI_SUM) 159 { 160 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<float>()); 161 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 162 } 163 else if(op == MPI_MAX) 164 { 165 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<float>); 166 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 167 168 } 169 else if(op == MPI_MIN) 170 { 171 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<float>); 172 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 173 174 } 175 else 176 { 177 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 178 exit(1); 179 } 180 #pragma omp flush 181 } 182 } 183 184 MPI_Barrier_local(comm); 185 } 186 } 187 } 188 189 int MPI_Scan_local_double(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 190 { 191 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 192 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 193 194 double *buffer = comm.my_buffer->buf_double; 195 double *send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 196 double *recv_buf = static_cast<double*>(recvbuf); 197 198 for(int j=0; j<count; j+=BUFFER_SIZE) 199 { 200 if(my_rank == 0) 201 { 202 203 #pragma omp critical (write_to_buffer) 204 { 205 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 206 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), recv_buf+j); 207 #pragma omp flush 208 } 209 } 210 211 MPI_Barrier_local(comm); 212 213 for(int k=1; k<num_ep; k++) 214 { 215 #pragma omp critical (write_to_buffer) 216 { 217 if(my_rank == k) 218 { 219 #pragma omp flush 220 if(op == MPI_SUM) 221 { 222 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<double>()); 223 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 224 } 225 else if(op == MPI_MAX) 226 { 227 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<double>); 228 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 229 } 230 else if(op == MPI_MIN) 231 { 232 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<double>); 233 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 234 } 235 else 236 { 237 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 238 exit(1); 239 } 240 #pragma omp flush 241 } 242 } 243 244 MPI_Barrier_local(comm); 245 } 246 } 247 } 248 249 int MPI_Scan_local_long(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 250 { 251 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 252 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 253 254 long *buffer = comm.my_buffer->buf_long; 255 long *send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 256 long *recv_buf = static_cast<long*>(recvbuf); 257 258 for(int j=0; j<count; j+=BUFFER_SIZE) 259 { 260 if(my_rank == 0) 261 { 262 263 #pragma omp critical (write_to_buffer) 264 { 265 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 266 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), recv_buf+j); 267 #pragma omp flush 268 } 269 } 270 271 MPI_Barrier_local(comm); 272 273 for(int k=1; k<num_ep; k++) 274 { 275 #pragma omp critical (write_to_buffer) 276 { 277 if(my_rank == k) 278 { 279 #pragma omp flush 280 if(op == MPI_SUM) 281 { 282 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<long>()); 283 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 284 } 285 else if(op == MPI_MAX) 286 { 287 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<long>); 288 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 289 } 290 else if(op == MPI_MIN) 291 { 292 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<long>); 293 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 294 } 295 else 296 { 297 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 298 exit(1); 299 } 300 #pragma omp flush 301 } 302 } 303 304 MPI_Barrier_local(comm); 305 } 306 } 307 } 308 309 int MPI_Scan_local_ulong(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 310 { 311 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 312 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 313 314 unsigned long *buffer = comm.my_buffer->buf_ulong; 315 unsigned long *send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 316 unsigned long *recv_buf = static_cast<unsigned long*>(recvbuf); 317 318 for(int j=0; j<count; j+=BUFFER_SIZE) 319 { 320 if(my_rank == 0) 321 { 322 323 #pragma omp critical (write_to_buffer) 324 { 325 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 326 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), recv_buf+j); 327 #pragma omp flush 328 } 329 } 330 331 MPI_Barrier_local(comm); 332 333 for(int k=1; k<num_ep; k++) 334 { 335 #pragma omp critical (write_to_buffer) 336 { 337 if(my_rank == k) 338 { 339 #pragma omp flush 340 if(op == MPI_SUM) 341 { 342 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<unsigned long>()); 343 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 344 } 345 else if(op == MPI_MAX) 346 { 347 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<unsigned long>); 348 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 349 } 350 else if(op == MPI_MIN) 351 { 352 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<unsigned long>); 353 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 354 } 355 else 356 { 357 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 358 exit(1); 359 } 360 #pragma omp flush 361 } 362 } 363 364 MPI_Barrier_local(comm); 365 } 366 } 367 } 368 369 int MPI_Scan_local_char(const void *sendbuf, void *recvbuf, int count, MPI_Op op, MPI_Comm comm) 370 { 371 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 372 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 373 374 char *buffer = comm.my_buffer->buf_char; 375 char *send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 376 char *recv_buf = static_cast<char*>(recvbuf); 377 378 for(int j=0; j<count; j+=BUFFER_SIZE) 379 { 380 if(my_rank == 0) 381 { 382 383 #pragma omp critical (write_to_buffer) 384 { 385 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), buffer); 386 copy(send_buf+j, send_buf+j+min(BUFFER_SIZE, count-j), recv_buf+j); 387 #pragma omp flush 388 } 389 } 390 391 MPI_Barrier_local(comm); 392 393 for(int k=1; k<num_ep; k++) 394 { 395 #pragma omp critical (write_to_buffer) 396 { 397 if(my_rank == k) 398 { 399 #pragma omp flush 400 if(op == MPI_SUM) 401 { 402 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, std::plus<char>()); 403 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 404 } 405 else if(op == MPI_MAX) 406 { 407 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, max_op<char>); 408 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 409 } 410 else if(op == MPI_MIN) 411 { 412 transform(buffer, buffer+min(BUFFER_SIZE, count-j), send_buf+j, buffer, min_op<char>); 413 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 414 } 415 else 416 { 417 printf("Supported operation: MPI_SUM, MPI_MAX, MPI_MIN\n"); 418 exit(1); 419 } 420 #pragma omp flush 421 } 422 } 423 424 MPI_Barrier_local(comm); 425 } 426 } 427 } 428 429 430 int MPI_Scan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 431 { 432 if(!comm.is_ep) 433 { 434 435 ::MPI_Scan(sendbuf, recvbuf, count, static_cast< ::MPI_Datatype>(datatype), 436 static_cast< ::MPI_Op>(op), static_cast< ::MPI_Comm>(comm.mpi_comm)); 437 return 0; 438 } 439 440 if(!comm.mpi_comm) return 0; 441 442 int ep_rank, ep_rank_loc, mpi_rank; 443 int ep_size, num_ep, mpi_size; 444 445 446 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 447 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 448 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 449 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 450 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 451 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 452 56 453 57 454 ::MPI_Aint datasize, lb; 58 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 59 60 if(ep_rank_loc == 0 && mpi_rank != 0) 61 { 62 if(op == MPI_SUM) 63 { 64 if(datatype == MPI_INT && datasize == sizeof(int)) 65 reduce_sum<int>(static_cast<int*>(const_cast<void*>(sendbuf)), static_cast<int*>(recvbuf), count); 66 67 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 68 reduce_sum<float>(static_cast<float*>(const_cast<void*>(sendbuf)), static_cast<float*>(recvbuf), count); 69 70 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 71 reduce_sum<double>(static_cast<double*>(const_cast<void*>(sendbuf)), static_cast<double*>(recvbuf), count); 72 73 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 74 reduce_sum<char>(static_cast<char*>(const_cast<void*>(sendbuf)), static_cast<char*>(recvbuf), count); 75 76 else if(datatype == MPI_LONG && datasize == sizeof(long)) 77 reduce_sum<long>(static_cast<long*>(const_cast<void*>(sendbuf)), static_cast<long*>(recvbuf), count); 78 79 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 80 reduce_sum<unsigned long>(static_cast<unsigned long*>(const_cast<void*>(sendbuf)), static_cast<unsigned long*>(recvbuf), count); 81 82 else printf("datatype Error\n"); 83 } 84 85 else if(op == MPI_MAX) 86 { 87 if(datatype == MPI_INT && datasize == sizeof(int)) 88 reduce_max<int>(static_cast<int*>(const_cast<void*>(sendbuf)), static_cast<int*>(recvbuf), count); 89 90 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 91 reduce_max<float>(static_cast<float*>(const_cast<void*>(sendbuf)), static_cast<float*>(recvbuf), count); 92 93 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 94 reduce_max<double>(static_cast<double*>(const_cast<void*>(sendbuf)), static_cast<double*>(recvbuf), count); 95 96 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 97 reduce_max<char>(static_cast<char*>(const_cast<void*>(sendbuf)), static_cast<char*>(recvbuf), count); 98 99 else if(datatype == MPI_LONG && datasize == sizeof(long)) 100 reduce_max<long>(static_cast<long*>(const_cast<void*>(sendbuf)), static_cast<long*>(recvbuf), count); 101 102 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 103 reduce_max<unsigned long>(static_cast<unsigned long*>(const_cast<void*>(sendbuf)), static_cast<unsigned long*>(recvbuf), count); 104 105 else printf("datatype Error\n"); 106 } 107 108 else //(op == MPI_MIN) 109 { 110 if(datatype == MPI_INT && datasize == sizeof(int)) 111 reduce_min<int>(static_cast<int*>(const_cast<void*>(sendbuf)), static_cast<int*>(recvbuf), count); 112 113 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 114 reduce_min<float>(static_cast<float*>(const_cast<void*>(sendbuf)), static_cast<float*>(recvbuf), count); 115 116 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 117 reduce_min<double>(static_cast<double*>(const_cast<void*>(sendbuf)), static_cast<double*>(recvbuf), count); 118 119 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 120 reduce_min<char>(static_cast<char*>(const_cast<void*>(sendbuf)), static_cast<char*>(recvbuf), count); 121 122 else if(datatype == MPI_LONG && datasize == sizeof(long)) 123 reduce_min<long>(static_cast<long*>(const_cast<void*>(sendbuf)), static_cast<long*>(recvbuf), count); 124 125 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 126 reduce_min<unsigned long>(static_cast<unsigned long*>(const_cast<void*>(sendbuf)), static_cast<unsigned long*>(recvbuf), count); 127 128 else printf("datatype Error\n"); 129 } 130 131 comm.my_buffer->void_buffer[0] = recvbuf; 132 } 133 else 134 { 135 comm.my_buffer->void_buffer[ep_rank_loc] = const_cast<void*>(sendbuf); 136 memcpy(recvbuf, sendbuf, datasize*count); 137 } 138 139 140 141 MPI_Barrier_local(comm); 142 143 memcpy(recvbuf, comm.my_buffer->void_buffer[0], datasize*count); 144 145 146 if(op == MPI_SUM) 147 { 148 if(datatype == MPI_INT && datasize == sizeof(int)) 149 { 150 for(int i=1; i<ep_rank_loc+1; i++) 151 reduce_sum<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 152 } 153 154 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 155 { 156 for(int i=1; i<ep_rank_loc+1; i++) 157 reduce_sum<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 158 } 159 160 161 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 162 { 163 for(int i=1; i<ep_rank_loc+1; i++) 164 reduce_sum<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 165 } 166 167 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 168 { 169 for(int i=1; i<ep_rank_loc+1; i++) 170 reduce_sum<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 171 } 172 173 else if(datatype == MPI_LONG && datasize == sizeof(long)) 174 { 175 for(int i=1; i<ep_rank_loc+1; i++) 176 reduce_sum<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 177 } 178 179 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 180 { 181 for(int i=1; i<ep_rank_loc+1; i++) 182 reduce_sum<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 183 } 184 185 else printf("datatype Error\n"); 186 187 188 } 189 190 else if(op == MPI_MAX) 191 { 192 if(datatype == MPI_INT && datasize == sizeof(int)) 193 for(int i=1; i<ep_rank_loc+1; i++) 194 reduce_max<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 195 196 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 197 for(int i=1; i<ep_rank_loc+1; i++) 198 reduce_max<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 199 200 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 201 for(int i=1; i<ep_rank_loc+1; i++) 202 reduce_max<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 203 204 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 205 for(int i=1; i<ep_rank_loc+1; i++) 206 reduce_max<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 207 208 else if(datatype == MPI_LONG && datasize == sizeof(long)) 209 for(int i=1; i<ep_rank_loc+1; i++) 210 reduce_max<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 211 212 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 213 for(int i=1; i<ep_rank_loc+1; i++) 214 reduce_max<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 215 216 else printf("datatype Error\n"); 217 } 218 219 else //if(op == MPI_MIN) 220 { 221 if(datatype == MPI_INT && datasize == sizeof(int)) 222 for(int i=1; i<ep_rank_loc+1; i++) 223 reduce_min<int>(static_cast<int*>(comm.my_buffer->void_buffer[i]), static_cast<int*>(recvbuf), count); 224 225 else if(datatype == MPI_FLOAT && datasize == sizeof(float)) 226 for(int i=1; i<ep_rank_loc+1; i++) 227 reduce_min<float>(static_cast<float*>(comm.my_buffer->void_buffer[i]), static_cast<float*>(recvbuf), count); 228 229 else if(datatype == MPI_DOUBLE && datasize == sizeof(double)) 230 for(int i=1; i<ep_rank_loc+1; i++) 231 reduce_min<double>(static_cast<double*>(comm.my_buffer->void_buffer[i]), static_cast<double*>(recvbuf), count); 232 233 else if(datatype == MPI_CHAR && datasize == sizeof(char)) 234 for(int i=1; i<ep_rank_loc+1; i++) 235 reduce_min<char>(static_cast<char*>(comm.my_buffer->void_buffer[i]), static_cast<char*>(recvbuf), count); 236 237 else if(datatype == MPI_LONG && datasize == sizeof(long)) 238 for(int i=1; i<ep_rank_loc+1; i++) 239 reduce_min<long>(static_cast<long*>(comm.my_buffer->void_buffer[i]), static_cast<long*>(recvbuf), count); 240 241 else if(datatype == MPI_UNSIGNED_LONG && datasize == sizeof(unsigned long)) 242 for(int i=1; i<ep_rank_loc+1; i++) 243 reduce_min<unsigned long>(static_cast<unsigned long*>(comm.my_buffer->void_buffer[i]), static_cast<unsigned long*>(recvbuf), count); 244 245 else printf("datatype Error\n"); 246 } 247 248 MPI_Barrier_local(comm); 249 250 } 251 252 253 int MPI_Scan(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype, MPI_Op op, MPI_Comm comm) 254 { 255 if(!comm.is_ep) 256 { 257 return ::MPI_Scan(sendbuf, recvbuf, count, to_mpi_type(datatype), to_mpi_op(op), to_mpi_comm(comm.mpi_comm)); 258 } 259 260 valid_type(datatype); 261 262 int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 263 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 264 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 265 int ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 266 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 267 int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 268 269 ::MPI_Aint datasize, lb; 270 ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); 271 272 void* tmp_sendbuf; 273 tmp_sendbuf = new void*[datasize * count]; 274 275 int my_src = 0; 276 int my_dst = ep_rank; 277 278 std::vector<int> my_map(mpi_size, 0); 279 280 for(int i=0; i<comm.rank_map->size(); i++) my_map[comm.rank_map->at(i).second]++; 281 282 for(int i=0; i<mpi_rank; i++) my_src += my_map[i]; 283 my_src += ep_rank_loc; 284 285 286 for(int i=0; i<mpi_size; i++) 287 { 288 if(my_dst < my_map[i]) 289 { 290 my_dst = get_ep_rank(comm, my_dst, i); 291 break; 292 } 293 else 294 my_dst -= my_map[i]; 295 } 296 297 //printf("ID = %d : send to %d, recv from %d\n", ep_rank, my_dst, my_src); 298 MPI_Barrier(comm); 299 300 if(my_dst == ep_rank && my_src == ep_rank) memcpy(tmp_sendbuf, sendbuf, datasize*count); 301 302 if(ep_rank != my_dst) 303 { 304 MPI_Request request[2]; 305 MPI_Status status[2]; 306 307 MPI_Isend(sendbuf, count, datatype, my_dst, my_dst, comm, &request[0]); 308 309 MPI_Irecv(tmp_sendbuf, count, datatype, my_src, ep_rank, comm, &request[1]); 310 311 MPI_Waitall(2, request, status); 312 } 313 314 315 void* tmp_recvbuf; 316 tmp_recvbuf = new void*[datasize * count]; 317 318 MPI_Reduce_local(tmp_sendbuf, tmp_recvbuf, count, datatype, op, 0, comm); 455 456 ::MPI_Type_get_extent(static_cast< ::MPI_Datatype>(datatype), &lb, &datasize); 457 458 void* local_scan_recvbuf; 459 local_scan_recvbuf = new void*[datasize * count]; 460 461 462 // local scan 463 MPI_Scan_local2(sendbuf, recvbuf, count, datatype, op, comm); 464 465 // MPI_scan 466 void* local_sum; 467 void* mpi_scan_recvbuf; 468 469 470 mpi_scan_recvbuf = new void*[datasize*count]; 319 471 320 472 if(ep_rank_loc == 0) 321 ::MPI_Exscan(MPI_IN_PLACE, tmp_recvbuf, count, to_mpi_type(datatype), to_mpi_op(op), to_mpi_comm(comm.mpi_comm)); 322 323 //printf(" ID=%d : %d %d \n", ep_rank, static_cast<int*>(tmp_recvbuf)[0], static_cast<int*>(tmp_recvbuf)[1]); 324 325 MPI_Scan_local(tmp_sendbuf, tmp_recvbuf, count, datatype, op, comm); 326 327 // printf(" ID=%d : after local tmp_sendbuf = %d %d ; tmp_recvbuf = %d %d \n", ep_rank, static_cast<int*>(tmp_sendbuf)[0], static_cast<int*>(tmp_sendbuf)[1], static_cast<int*>(tmp_recvbuf)[0], static_cast<int*>(tmp_recvbuf)[1]); 328 329 330 331 if(ep_rank != my_src) 332 { 333 MPI_Request request[2]; 334 MPI_Status status[2]; 335 336 MPI_Isend(tmp_recvbuf, count, datatype, my_src, my_src, comm, &request[0]); 337 338 MPI_Irecv(recvbuf, count, datatype, my_dst, ep_rank, comm, &request[1]); 339 340 MPI_Waitall(2, request, status); 341 } 342 343 else memcpy(recvbuf, tmp_recvbuf, datasize*count); 344 345 346 347 348 delete[] tmp_sendbuf; 349 delete[] tmp_recvbuf; 473 { 474 local_sum = new void*[datasize*count]; 475 } 476 477 478 MPI_Reduce_local2(sendbuf, local_sum, count, datatype, op, comm); 479 480 if(ep_rank_loc == 0) 481 { 482 ::MPI_Exscan(local_sum, mpi_scan_recvbuf, count, static_cast< ::MPI_Datatype>(datatype), static_cast< ::MPI_Op>(op), static_cast< ::MPI_Comm>(comm.mpi_comm)); 483 } 484 485 486 if(mpi_rank > 0) 487 { 488 MPI_Bcast_local2(mpi_scan_recvbuf, count, datatype, comm); 489 } 490 491 492 if(datatype == MPI_DOUBLE) 493 { 494 double* sum_buf = static_cast<double*>(mpi_scan_recvbuf); 495 double* recv_buf = static_cast<double*>(recvbuf); 496 497 if(mpi_rank != 0) 498 { 499 if(op == MPI_SUM) 500 { 501 for(int i=0; i<count; i++) 502 { 503 recv_buf[i] += sum_buf[i]; 504 } 505 } 506 else if (op == MPI_MAX) 507 { 508 for(int i=0; i<count; i++) 509 { 510 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 511 } 512 } 513 else if(op == MPI_MIN) 514 { 515 for(int i=0; i<count; i++) 516 { 517 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 518 } 519 } 520 else 521 { 522 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 523 exit(1); 524 } 525 } 526 527 delete[] static_cast<double*>(mpi_scan_recvbuf); 528 if(ep_rank_loc == 0) 529 { 530 delete[] static_cast<double*>(local_sum); 531 } 532 } 533 534 else if(datatype == MPI_FLOAT) 535 { 536 float* sum_buf = static_cast<float*>(mpi_scan_recvbuf); 537 float* recv_buf = static_cast<float*>(recvbuf); 538 539 if(mpi_rank != 0) 540 { 541 if(op == MPI_SUM) 542 { 543 for(int i=0; i<count; i++) 544 { 545 recv_buf[i] += sum_buf[i]; 546 } 547 } 548 else if (op == MPI_MAX) 549 { 550 for(int i=0; i<count; i++) 551 { 552 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 553 } 554 } 555 else if(op == MPI_MIN) 556 { 557 for(int i=0; i<count; i++) 558 { 559 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 560 } 561 } 562 else 563 { 564 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 565 exit(1); 566 } 567 } 568 569 delete[] static_cast<float*>(mpi_scan_recvbuf); 570 if(ep_rank_loc == 0) 571 { 572 delete[] static_cast<float*>(local_sum); 573 } 574 } 575 576 else if(datatype == MPI_INT) 577 { 578 int* sum_buf = static_cast<int*>(mpi_scan_recvbuf); 579 int* recv_buf = static_cast<int*>(recvbuf); 580 581 if(mpi_rank != 0) 582 { 583 if(op == MPI_SUM) 584 { 585 for(int i=0; i<count; i++) 586 { 587 recv_buf[i] += sum_buf[i]; 588 } 589 } 590 else if (op == MPI_MAX) 591 { 592 for(int i=0; i<count; i++) 593 { 594 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 595 } 596 } 597 else if(op == MPI_MIN) 598 { 599 for(int i=0; i<count; i++) 600 { 601 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 602 } 603 } 604 else 605 { 606 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 607 exit(1); 608 } 609 } 610 611 delete[] static_cast<int*>(mpi_scan_recvbuf); 612 if(ep_rank_loc == 0) 613 { 614 delete[] static_cast<int*>(local_sum); 615 } 616 } 617 618 else if(datatype == MPI_LONG) 619 { 620 long* sum_buf = static_cast<long*>(mpi_scan_recvbuf); 621 long* recv_buf = static_cast<long*>(recvbuf); 622 623 if(mpi_rank != 0) 624 { 625 if(op == MPI_SUM) 626 { 627 for(int i=0; i<count; i++) 628 { 629 recv_buf[i] += sum_buf[i]; 630 } 631 } 632 else if (op == MPI_MAX) 633 { 634 for(int i=0; i<count; i++) 635 { 636 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 637 } 638 } 639 else if(op == MPI_MIN) 640 { 641 for(int i=0; i<count; i++) 642 { 643 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 644 } 645 } 646 else 647 { 648 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 649 exit(1); 650 } 651 } 652 653 delete[] static_cast<long*>(mpi_scan_recvbuf); 654 if(ep_rank_loc == 0) 655 { 656 delete[] static_cast<long*>(local_sum); 657 } 658 } 659 660 else if(datatype == MPI_UNSIGNED_LONG) 661 { 662 unsigned long* sum_buf = static_cast<unsigned long*>(mpi_scan_recvbuf); 663 unsigned long* recv_buf = static_cast<unsigned long*>(recvbuf); 664 665 if(mpi_rank != 0) 666 { 667 if(op == MPI_SUM) 668 { 669 for(int i=0; i<count; i++) 670 { 671 recv_buf[i] += sum_buf[i]; 672 } 673 } 674 else if (op == MPI_MAX) 675 { 676 for(int i=0; i<count; i++) 677 { 678 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 679 } 680 } 681 else if(op == MPI_MIN) 682 { 683 for(int i=0; i<count; i++) 684 { 685 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 686 } 687 } 688 else 689 { 690 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 691 exit(1); 692 } 693 } 694 695 delete[] static_cast<unsigned long*>(mpi_scan_recvbuf); 696 if(ep_rank_loc == 0) 697 { 698 delete[] static_cast<unsigned long*>(local_sum); 699 } 700 } 701 702 else if(datatype == MPI_CHAR) 703 { 704 char* sum_buf = static_cast<char*>(mpi_scan_recvbuf); 705 char* recv_buf = static_cast<char*>(recvbuf); 706 707 if(mpi_rank != 0) 708 { 709 if(op == MPI_SUM) 710 { 711 for(int i=0; i<count; i++) 712 { 713 recv_buf[i] += sum_buf[i]; 714 } 715 } 716 else if (op == MPI_MAX) 717 { 718 for(int i=0; i<count; i++) 719 { 720 recv_buf[i] = max(recv_buf[i], sum_buf[i]); 721 } 722 } 723 else if(op == MPI_MIN) 724 { 725 for(int i=0; i<count; i++) 726 { 727 recv_buf[i] = min(recv_buf[i], sum_buf[i]); 728 } 729 } 730 else 731 { 732 printf("Support operator for MPI_Scan is MPI_SUM, MPI_MAX, and MPI_MIN\n"); 733 exit(1); 734 } 735 } 736 737 delete[] static_cast<char*>(mpi_scan_recvbuf); 738 if(ep_rank_loc == 0) 739 { 740 delete[] static_cast<char*>(local_sum); 741 } 742 } 743 350 744 351 745 } -
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_scatter.cpp
r1287 r1289 9 9 #include <mpi.h> 10 10 #include "ep_declaration.hpp" 11 #include "ep_mpi.hpp"12 11 13 12 using namespace std; … … 16 15 { 17 16 18 int MPI_Scatter_local(void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int local_root, MPI_Comm comm) 19 { 20 assert(valid_type(sendtype) && valid_type(recvtype)); 21 assert(recvcount == sendcount); 22 23 ::MPI_Aint datasize, lb; 24 ::MPI_Type_get_extent(to_mpi_type(sendtype), &lb, &datasize); 25 26 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 27 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 28 29 30 if(ep_rank_loc == local_root) 31 comm.my_buffer->void_buffer[local_root] = const_cast<void*>(sendbuf); 32 33 MPI_Barrier_local(comm); 34 35 #pragma omp critical (_scatter) 36 memcpy(recvbuf, comm.my_buffer->void_buffer[local_root]+datasize*ep_rank_loc*sendcount, datasize * recvcount); 37 38 39 MPI_Barrier_local(comm); 40 } 17 int MPI_Scatter_local2(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, MPI_Comm comm) 18 { 19 if(datatype == MPI_INT) 20 { 21 Debug("datatype is INT\n"); 22 return MPI_Scatter_local_int(sendbuf, count, recvbuf, comm); 23 } 24 else if(datatype == MPI_FLOAT) 25 { 26 Debug("datatype is FLOAT\n"); 27 return MPI_Scatter_local_float(sendbuf, count, recvbuf, comm); 28 } 29 else if(datatype == MPI_DOUBLE) 30 { 31 Debug("datatype is DOUBLE\n"); 32 return MPI_Scatter_local_double(sendbuf, count, recvbuf, comm); 33 } 34 else if(datatype == MPI_LONG) 35 { 36 Debug("datatype is LONG\n"); 37 return MPI_Scatter_local_long(sendbuf, count, recvbuf, comm); 38 } 39 else if(datatype == MPI_UNSIGNED_LONG) 40 { 41 Debug("datatype is uLONG\n"); 42 return MPI_Scatter_local_ulong(sendbuf, count, recvbuf, comm); 43 } 44 else if(datatype == MPI_CHAR) 45 { 46 Debug("datatype is CHAR\n"); 47 return MPI_Scatter_local_char(sendbuf, count, recvbuf, comm); 48 } 49 else 50 { 51 printf("MPI_Scatter Datatype not supported!\n"); 52 exit(0); 53 } 54 } 55 56 int MPI_Scatter_local_int(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 57 { 58 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 59 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 60 61 62 int *buffer = comm.my_buffer->buf_int; 63 int *send_buf = static_cast<int*>(const_cast<void*>(sendbuf)); 64 int *recv_buf = static_cast<int*>(recvbuf); 65 66 for(int k=0; k<num_ep; k++) 67 { 68 for(int j=0; j<count; j+=BUFFER_SIZE) 69 { 70 if(my_rank == 0) 71 { 72 #pragma omp critical (write_to_buffer) 73 { 74 copy(send_buf+k*count+j, send_buf+k*count+j+min(BUFFER_SIZE, count-j), buffer); 75 #pragma omp flush 76 } 77 } 78 79 MPI_Barrier_local(comm); 80 81 if(my_rank == k) 82 { 83 #pragma omp critical (read_from_buffer) 84 { 85 #pragma omp flush 86 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 87 } 88 } 89 MPI_Barrier_local(comm); 90 } 91 } 92 } 93 94 int MPI_Scatter_local_float(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 95 { 96 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 97 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 98 99 float *buffer = comm.my_buffer->buf_float; 100 float *send_buf = static_cast<float*>(const_cast<void*>(sendbuf)); 101 float *recv_buf = static_cast<float*>(recvbuf); 102 103 for(int k=0; k<num_ep; k++) 104 { 105 for(int j=0; j<count; j+=BUFFER_SIZE) 106 { 107 if(my_rank == 0) 108 { 109 #pragma omp critical (write_to_buffer) 110 { 111 copy(send_buf+k*count+j, send_buf+k*count+j+min(BUFFER_SIZE, count-j), buffer); 112 #pragma omp flush 113 } 114 } 115 116 MPI_Barrier_local(comm); 117 118 if(my_rank == k) 119 { 120 #pragma omp critical (read_from_buffer) 121 { 122 #pragma omp flush 123 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 124 } 125 } 126 MPI_Barrier_local(comm); 127 } 128 } 129 } 130 131 int MPI_Scatter_local_double(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 132 { 133 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 134 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 135 136 double *buffer = comm.my_buffer->buf_double; 137 double *send_buf = static_cast<double*>(const_cast<void*>(sendbuf)); 138 double *recv_buf = static_cast<double*>(recvbuf); 139 140 for(int k=0; k<num_ep; k++) 141 { 142 for(int j=0; j<count; j+=BUFFER_SIZE) 143 { 144 if(my_rank == 0) 145 { 146 #pragma omp critical (write_to_buffer) 147 { 148 copy(send_buf+k*count+j, send_buf+k*count+j+min(BUFFER_SIZE, count-j), buffer); 149 #pragma omp flush 150 } 151 } 152 153 MPI_Barrier_local(comm); 154 155 if(my_rank == k) 156 { 157 #pragma omp critical (read_from_buffer) 158 { 159 #pragma omp flush 160 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 161 } 162 } 163 MPI_Barrier_local(comm); 164 } 165 } 166 } 167 168 int MPI_Scatter_local_long(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 169 { 170 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 171 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 172 173 long *buffer = comm.my_buffer->buf_long; 174 long *send_buf = static_cast<long*>(const_cast<void*>(sendbuf)); 175 long *recv_buf = static_cast<long*>(recvbuf); 176 177 for(int k=0; k<num_ep; k++) 178 { 179 for(int j=0; j<count; j+=BUFFER_SIZE) 180 { 181 if(my_rank == 0) 182 { 183 #pragma omp critical (write_to_buffer) 184 { 185 copy(send_buf+k*count+j, send_buf+k*count+j+min(BUFFER_SIZE, count-j), buffer); 186 #pragma omp flush 187 } 188 } 189 190 MPI_Barrier_local(comm); 191 192 if(my_rank == k) 193 { 194 #pragma omp critical (read_from_buffer) 195 { 196 #pragma omp flush 197 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 198 } 199 } 200 MPI_Barrier_local(comm); 201 } 202 } 203 } 204 205 206 int MPI_Scatter_local_ulong(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 207 { 208 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 209 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 210 211 unsigned long *buffer = comm.my_buffer->buf_ulong; 212 unsigned long *send_buf = static_cast<unsigned long*>(const_cast<void*>(sendbuf)); 213 unsigned long *recv_buf = static_cast<unsigned long*>(recvbuf); 214 215 for(int k=0; k<num_ep; k++) 216 { 217 for(int j=0; j<count; j+=BUFFER_SIZE) 218 { 219 if(my_rank == 0) 220 { 221 #pragma omp critical (write_to_buffer) 222 { 223 copy(send_buf+k*count+j, send_buf+k*count+j+min(BUFFER_SIZE, count-j), buffer); 224 #pragma omp flush 225 } 226 } 227 228 MPI_Barrier_local(comm); 229 230 if(my_rank == k) 231 { 232 #pragma omp critical (read_from_buffer) 233 { 234 #pragma omp flush 235 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 236 } 237 } 238 MPI_Barrier_local(comm); 239 } 240 } 241 } 242 243 244 int MPI_Scatter_local_char(const void *sendbuf, int count, void *recvbuf, MPI_Comm comm) 245 { 246 int my_rank = comm.ep_comm_ptr->size_rank_info[1].first; 247 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 248 249 char *buffer = comm.my_buffer->buf_char; 250 char *send_buf = static_cast<char*>(const_cast<void*>(sendbuf)); 251 char *recv_buf = static_cast<char*>(recvbuf); 252 253 for(int k=0; k<num_ep; k++) 254 { 255 for(int j=0; j<count; j+=BUFFER_SIZE) 256 { 257 if(my_rank == 0) 258 { 259 #pragma omp critical (write_to_buffer) 260 { 261 copy(send_buf+k*count+j, send_buf+k*count+j+min(BUFFER_SIZE, count-j), buffer); 262 #pragma omp flush 263 } 264 } 265 266 MPI_Barrier_local(comm); 267 268 if(my_rank == k) 269 { 270 #pragma omp critical (read_from_buffer) 271 { 272 #pragma omp flush 273 copy(buffer, buffer+min(BUFFER_SIZE, count-j), recv_buf+j); 274 } 275 } 276 MPI_Barrier_local(comm); 277 } 278 } 279 } 280 281 282 283 41 284 42 285 int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm) … … 44 287 if(!comm.is_ep) 45 288 { 46 return ::MPI_Scatter(sendbuf, sendcount, to_mpi_type(sendtype), recvbuf, recvcount, to_mpi_type(recvtype), root, to_mpi_comm(comm.mpi_comm)); 47 } 48 49 assert(sendcount == recvcount); 50 51 int ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 52 int ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 53 int mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 54 int ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 55 int num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 56 int mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 289 ::MPI_Scatter(sendbuf, sendcount, static_cast< ::MPI_Datatype>(sendtype), recvbuf, recvcount, static_cast< ::MPI_Datatype>(recvtype), 290 root, static_cast< ::MPI_Comm>(comm.mpi_comm)); 291 return 0; 292 } 293 294 if(!comm.mpi_comm) return 0; 295 296 assert(static_cast< ::MPI_Datatype>(sendtype) == static_cast< ::MPI_Datatype>(recvtype) && sendcount == recvcount); 57 297 58 298 int root_mpi_rank = comm.rank_map->at(root).second; 59 299 int root_ep_loc = comm.rank_map->at(root).first; 60 300 61 bool is_master = (ep_rank_loc==0 && mpi_rank != root_mpi_rank ) || ep_rank == root; 62 bool is_root = ep_rank == root; 301 int ep_rank, ep_rank_loc, mpi_rank; 302 int ep_size, num_ep, mpi_size; 303 304 ep_rank = comm.ep_comm_ptr->size_rank_info[0].first; 305 ep_rank_loc = comm.ep_comm_ptr->size_rank_info[1].first; 306 mpi_rank = comm.ep_comm_ptr->size_rank_info[2].first; 307 ep_size = comm.ep_comm_ptr->size_rank_info[0].second; 308 num_ep = comm.ep_comm_ptr->size_rank_info[1].second; 309 mpi_size = comm.ep_comm_ptr->size_rank_info[2].second; 310 63 311 64 312 MPI_Datatype datatype = sendtype; …