Changeset 1196
- Timestamp:
- 07/05/17 14:14:09 (7 years ago)
- Location:
- XIOS/dev/branch_yushan_merged
- Files:
-
- 15 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_yushan_merged/bld.cfg
r1187 r1196 38 38 #bld::target test_expand_domain.exe 39 39 #bld::target test_new_features.exe test_unstruct_complete.exe 40 bld::target test_omp.exe #test_complete_omp.exe test_remap_omp.exe test_unstruct_omp.exe40 bld::target test_omp.exe test_complete_omp.exe test_remap_omp.exe test_unstruct_omp.exe 41 41 #bld::target test_client.exe test_complete.exe #test_xios2_cmip6.exe 42 42 #bld::target test_connectivity_expand.exe -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_fortran.cpp
r1134 r1196 31 31 { 32 32 fc_comm_map.insert(std::make_pair( std::make_pair( fint, omp_get_thread_num()) , comm)); 33 //printf("EP_Comm_c2f : MAP insert: %d, %d, %p\n", fint, omp_get_thread_num(), comm.ep_comm_ptr);33 //printf("EP_Comm_c2f : MAP %p insert: %d, %d, %p\n", &fc_comm_map, fint, omp_get_thread_num(), comm.ep_comm_ptr); 34 34 } 35 35 } … … 54 54 MPI_Comm comm_ptr; 55 55 comm_ptr = it->second; 56 //printf("EP_Comm_f2c : MAP find: %d, %d, %p\n", it->first.first, it->first.second, comm_ptr.ep_comm_ptr);56 //printf("EP_Comm_f2c : MAP %p find: %d, %d, %p\n", &fc_comm_map, it->first.first, it->first.second, comm_ptr.ep_comm_ptr); 57 57 return comm_ptr; 58 58 } -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_intercomm.cpp
r1134 r1196 43 43 MPI_Isend(&leader_ranks[0], 3, MPI_INT_STD, remote_leader, tag, peer_comm, &req_s); 44 44 MPI_Status status; 45 MPI_Wait(&req_s, &status); 46 45 47 MPI_Irecv(&leader_ranks[3], 3, MPI_INT_STD, remote_leader, tag, peer_comm, &req_r); 46 MPI_Wait(&req_s, &status);47 48 MPI_Wait(&req_r, &status); 48 49 } -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_lib.cpp
r1187 r1196 7 7 using namespace std; 8 8 9 std::list< ep_lib::MPI_Request* > ** EP_PendingRequests = 0; 10 #pragma omp threadprivate(EP_PendingRequests) 9 11 10 12 namespace ep_lib … … 429 431 430 432 433 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_lib.hpp
r1134 r1196 62 62 63 63 int Message_Check(MPI_Comm comm); 64 int Request_Check(); 64 65 65 66 int MPI_Recv (void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status); -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_message.cpp
r1187 r1196 11 11 12 12 using namespace std; 13 14 extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests; 15 #pragma omp threadprivate(EP_PendingRequests) 13 16 14 17 namespace ep_lib … … 248 251 #pragma omp flush 249 252 ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 250 printf("probed one message, ep_src = %d, tag = %d, mpi_status = %p (%p), message = %d\n", msg_block->ep_src, msg_block->ep_tag, msg_block->mpi_status, &status, msg_block->mpi_message);253 //printf("probed one message, ep_src = %d, tag = %d, mpi_status = %p (%p), message = %d\n", msg_block->ep_src, msg_block->ep_tag, msg_block->mpi_status, &status, msg_block->mpi_message); 251 254 #pragma omp flush 252 255 } … … 261 264 } 262 265 266 int Request_Check() 267 { 268 MPI_Status status; 269 MPI_Message message; 270 int probed = false; 271 int recv_count = 0; 272 std::list<MPI_Request* >::iterator it; 273 274 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 275 { 276 Message_Check((*it)->comm); 277 } 278 279 280 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 281 { 282 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 283 if(probed) 284 { 285 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 286 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 287 (*it)->type = 3; 288 //printf("request add = %p, mpi_request=%d\n", *it, (*it)->mpi_request); 289 EP_PendingRequests->erase(it); 290 it = EP_PendingRequests->begin(); 291 continue; 292 } 293 it++; 294 } 295 } 296 263 297 } -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_recv.cpp
r1187 r1196 13 13 using namespace std; 14 14 15 extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests; 16 #pragma omp threadprivate(EP_PendingRequests) 15 17 16 18 namespace ep_lib … … 54 56 int dest_rank; 55 57 MPI_Comm_rank(comm, &dest_rank); 58 int world_rank; 59 MPI_Comm_rank(MPI_COMM_WORLD_STD, &world_rank); 56 60 57 61 if(!comm.is_ep) … … 83 87 if(EP_PendingRequests == 0 ) 84 88 { 85 EP_PendingRequests = new std::list< MPI_Request* >; 86 printf("proc %d : EP_PendingRequests allocated, add = %p\n", dest_rank, EP_PendingRequests);89 EP_PendingRequests = new std::list< MPI_Request* >; 90 //printf("proc %d(%d) : EP_PendingRequests allocated, add = %p\n", dest_rank, world_rank, EP_PendingRequests); 87 91 } 88 92 89 request->pending_ptr = EP_PendingRequests; 90 printf("proc %d : &EP_PendingRequests add = %p, ptr = %p\n", dest_rank, EP_PendingRequests, request->pending_ptr); 93 94 EP_PendingRequests->push_back(request); 95 96 Request_Check(); 97 //printf("proc %d(%d) : EP_PendingRequests insert one request, src = %d(%d), tag = %d(%d), size = %d; request add = %p\n", 98 // dest_rank, world_rank, EP_PendingRequests->back()->ep_src, request->ep_src, 99 // EP_PendingRequests->back()->ep_tag, request->ep_tag, 100 // EP_PendingRequests->size(), request); 91 101 92 EP_PendingRequests->push_back(request); 93 //printf("proc %d : EP_PendingRequests insert one request, src = %d, tag = %d, size = %d\n", dest_rank, request->ep_src, request->ep_tag, EP_PendingRequests->size()); 102 // check all EP_PendingRequests 103 //for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 104 //{ 105 //if((*it)->type == 3) 106 //{ 107 // EP_PendingRequests->erase(it); 108 // it = EP_PendingRequests->begin(); 109 // continue; 110 // } 111 112 //int probed = false; 113 //MPI_Message pending_message; 114 //MPI_Status pending_status; 94 115 95 // check all EP_PendingRequests116 //MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status); 96 117 97 //printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size()); 98 99 for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 100 { 101 if((*it)->type == 3) 102 { 103 //printf("proc %d : pending request type = %d, src= %d, tag = %d skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag); 104 EP_PendingRequests->erase(it); 105 it = EP_PendingRequests->begin(); 106 //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 107 continue; 108 } 118 //if(probed) 119 //{ 120 //int count; 121 //MPI_Get_count(&pending_status, (*it)->ep_datatype, &count); 122 //MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it); 123 124 //EP_PendingRequests->erase(it); 125 //if(EP_PendingRequests->empty()) return 0; 109 126 110 //printf("proc %d : pending irecv request src = %d, tag = %d, type = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type); 111 int probed = false; 112 MPI_Message pending_message; 113 MPI_Status pending_status; 114 115 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status); 116 //printf("proc %d : pending irecv request probed to be %d, src = %d, tag = %d\n",dest_rank, probed, (*it)->ep_src, (*it)->ep_tag); 117 118 if(probed) 119 { 120 int count; 121 MPI_Get_count(&pending_status, (*it)->ep_datatype, &count); 122 MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it); 123 //printf("proc %d : pending request is imrecving src = %d, tag = %d, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, count); 124 EP_PendingRequests->erase(it); 125 it = EP_PendingRequests->begin(); 126 //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 127 continue; 128 } 127 //it = EP_PendingRequests->begin(); 128 //continue; 129 // } 129 130 130 it++;131 }131 //it++; 132 // } 132 133 133 134 return 0; … … 149 150 request->ep_tag = message->ep_tag; 150 151 request->ep_src = message->ep_src; 151 //request->buf = buf;152 152 153 153 return 0; … … 177 177 178 178 179 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_send.cpp
r1185 r1196 50 50 return 0; 51 51 } 52 53 int MPI_Bsend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm) 54 { 55 if(!comm.is_ep) 56 { 57 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm > (comm.mpi_comm); 58 ::MPI_Bsend(buf, count, static_cast< ::MPI_Datatype>(datatype), dest, tag, mpi_comm); 59 return 0; 60 } 61 62 MPI_Request request; 63 MPI_Status status; 64 //MPI_Ibsend(buf, count, datatype, dest, tag, comm, &request); 65 MPI_Wait(&request, &status); 66 67 //check_sum_send(buf, count, datatype, dest, tag, comm); 68 69 return 0; 70 } 71 52 72 53 73 … … 103 123 request->buf = const_cast<void*>(buf); 104 124 105 Message_Check(comm);125 //Message_Check(comm); 106 126 107 127 return 0; … … 166 186 return 0; 167 187 } 188 189 int MPI_Ibsend(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) 190 { 191 Debug("\nMPI_Isend with EP\n"); 192 int src_rank; 193 MPI_Comm_rank(comm, &src_rank); 194 195 196 197 if(!comm.is_ep) 198 { 199 ::MPI_Request mpi_request; 200 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm > (comm.mpi_comm); 201 ::MPI_Ibsend(buf, count, static_cast< ::MPI_Datatype> (datatype), dest, tag, mpi_comm, &mpi_request); 202 203 request->mpi_request = mpi_request; 204 205 request->ep_src = src_rank; 206 request->ep_tag = tag; 207 request->ep_datatype = datatype; 208 request->type = 1; 209 request->comm = comm; 210 211 return 0; 212 } 213 214 if(comm.is_intercomm) return 0;//MPI_Ibsend_intercomm(buf, count, datatype, dest, tag, comm, request); 215 216 // EP intracomm 217 218 //check_sum_send(buf, count, datatype, dest, tag, comm, 1); 219 220 int ep_src_loc = comm.ep_comm_ptr->size_rank_info[1].first; 221 int ep_dest_loc = comm.ep_comm_ptr->comm_list->rank_map->at(dest).first; 222 int mpi_tag = tag_combine(tag, ep_src_loc, ep_dest_loc); 223 int mpi_dest = comm.ep_comm_ptr->comm_list->rank_map->at(dest).second; 224 225 request->ep_src = src_rank; 226 request->ep_tag = tag; 227 request->ep_datatype = datatype; 228 229 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm> (comm.mpi_comm); 230 ::MPI_Request mpi_request; 231 232 ::MPI_Ibsend(buf, count, static_cast< ::MPI_Datatype>(datatype), mpi_dest, mpi_tag, mpi_comm, &mpi_request); 233 234 request->mpi_request = mpi_request; 235 request->type = 1; // used in wait 236 request->comm = comm; 237 request->buf = const_cast<void*>(buf); 238 239 //Message_Check(comm); 240 241 return 0; 242 } 168 243 169 244 … … 190 265 } 191 266 192 Message_Check(comm);267 //Message_Check(comm); 193 268 194 269 … … 263 338 } 264 339 265 Message_Check(comm);340 //Message_Check(comm); 266 341 267 342 … … 313 388 314 389 } 390 391 int MPI_Ibsend_intercomm(const void *buf, int count, MPI_Datatype datatype, int dest, int tag, MPI_Comm comm, MPI_Request *request) 392 { 393 printf("MPI_Ibsend with intercomm not yet implemented\n"); 394 MPI_Abort(comm, 0); 395 //check_sum_send(buf, count, datatype, dest, tag, comm, 1); 396 397 int dest_remote_ep_rank = comm.ep_comm_ptr->intercomm->remote_rank_map->at(dest).first; 398 int dest_remote_comm_label = comm.ep_comm_ptr->intercomm->remote_rank_map->at(dest).second; 399 400 int src_ep_rank = comm.ep_comm_ptr->intercomm->size_rank_info[0].first; 401 int src_comm_label; 402 403 for(int i=0; i<comm.ep_comm_ptr->intercomm->local_rank_map->size(); i++) 404 { 405 if(comm.ep_comm_ptr->intercomm->local_rank_map->at(i).first == src_ep_rank) 406 { 407 src_comm_label = comm.ep_comm_ptr->intercomm->local_rank_map->at(i).second; 408 break; 409 } 410 } 411 412 //Message_Check(comm); 413 414 415 if(dest_remote_comm_label == src_comm_label) // mpi_dest differs 416 { 417 int inter_src = comm.ep_comm_ptr->intercomm->local_rank_map->at(src_ep_rank).first; 418 int ep_src_loc = comm.rank_map->at(inter_src).first; 419 int ep_dest_loc = comm.rank_map->at(dest_remote_ep_rank).first; 420 int mpi_dest = comm.rank_map->at(dest_remote_ep_rank).second; 421 int mpi_tag = tag_combine(tag, ep_src_loc, ep_dest_loc); 422 423 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm > (comm.mpi_comm); 424 ::MPI_Request mpi_request; 425 426 ::MPI_Isend(buf, count, static_cast< ::MPI_Datatype >(datatype), mpi_dest, mpi_tag, mpi_comm, &mpi_request); 427 428 request->mpi_request = mpi_request; 429 request->type = 1; // used in wait 430 request->comm = comm; 431 432 request->ep_src = src_ep_rank; 433 request->ep_tag = tag; 434 request->ep_datatype = datatype; 435 } 436 437 else // dest_remote_comm_label != src_comm_label 438 { 439 int inter_src = comm.ep_comm_ptr->intercomm->local_rank_map->at(src_ep_rank).first; 440 int ep_src_loc = comm.rank_map->at(inter_src).first; 441 int ep_dest_loc = comm.ep_comm_ptr->intercomm->intercomm_rank_map->at(dest_remote_ep_rank).first; 442 int mpi_dest = comm.ep_comm_ptr->intercomm->intercomm_rank_map->at(dest_remote_ep_rank).second; 443 int mpi_tag = tag_combine(tag, ep_src_loc, ep_dest_loc); 444 445 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm >(comm.ep_comm_ptr->intercomm->mpi_inter_comm); 446 ::MPI_Request mpi_request; 447 448 ::MPI_Isend(buf, count, static_cast< ::MPI_Datatype >(datatype), mpi_dest, mpi_tag, mpi_comm, &mpi_request); 449 450 request->mpi_request = mpi_request; 451 request->type = 1; // used in wait 452 request->comm = comm; 453 454 request->ep_src = src_ep_rank; 455 request->ep_tag = tag; 456 request->ep_datatype = datatype; 457 } 458 459 return 0; 460 461 } 315 462 } 316 463 … … 319 466 320 467 468 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_type.hpp
r1187 r1196 454 454 #endif 455 455 456 std::list< MPI_Request* > * pending_ptr; 456 457 bool operator == (MPI_Request right) 458 { 459 //bool a = mpi_request == right.mpi_request; 460 bool b = type == right.type; 461 bool c = buf == right.buf; 462 bool d = ep_src == right.ep_src; 463 bool e = ep_tag == right.ep_tag; 464 bool f = ep_datatype == right.ep_datatype; 465 return b&&c&&d&&e&&f; 466 } 457 467 }; 458 468 … … 492 502 // <MPI_Fint,thread_num> EP_Comm 493 503 494 static std::list< MPI_Request* > * EP_PendingRequests = 0; 495 #pragma omp threadprivate(EP_PendingRequests) 504 //static std::list<MPI_Request * > *EP_PendingRequests = 0; 496 505 } 497 506 … … 499 508 500 509 #endif // EP_TYPE_HPP_INCLUDED 510 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_wait.cpp
r1187 r1196 12 12 using namespace std; 13 13 14 extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests; 15 #pragma omp threadprivate(EP_PendingRequests) 16 14 17 15 18 16 19 namespace ep_lib 17 20 { 18 21 19 22 int MPI_Wait(MPI_Request *request, MPI_Status *status) 20 23 { 21 22 if(request->type == 1) //=>isend 24 if(request->type !=1 && request->type !=2 && request->type !=3) 23 25 { 24 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 25 ::MPI_Status mpi_status; 26 ::MPI_Errhandler_set(MPI_COMM_WORLD_STD, MPI_ERRORS_RETURN); 27 int error_code = ::MPI_Wait(&mpi_request, &mpi_status); 28 if (error_code != MPI_SUCCESS) { 29 30 char error_string[BUFSIZ]; 31 int length_of_error_string, error_class; 32 33 ::MPI_Error_class(error_code, &error_class); 34 ::MPI_Error_string(error_class, error_string, &length_of_error_string); 35 printf("%s\n", error_string); 36 } 37 38 status->mpi_status = &mpi_status; 39 status->ep_src = request->ep_src; 40 status->ep_tag = request->ep_tag; 41 status->ep_datatype = request->ep_datatype; 42 43 return 0; 26 printf("Error in request type\n"); 27 28 exit(1); 44 29 } 45 30 46 if(request->type == 3) //=>imrecv 47 { 48 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 49 ::MPI_Status mpi_status; 50 ::MPI_Errhandler_set(MPI_COMM_WORLD_STD, MPI_ERRORS_RETURN); 51 int error_code = ::MPI_Wait(&mpi_request, &mpi_status); 52 if (error_code != MPI_SUCCESS) { 31 while(request->type == 2) Request_Check(); 32 33 34 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 35 ::MPI_Status mpi_status; 36 ::MPI_Wait(&mpi_request, &mpi_status); 53 37 54 char error_string[BUFSIZ]; 55 int length_of_error_string, error_class; 56 57 ::MPI_Error_class(error_code, &error_class); 58 ::MPI_Error_string(error_class, error_string, &length_of_error_string); 59 printf("%s\n", error_string); 60 } 61 38 request->mpi_request = mpi_request; 62 39 63 status->mpi_status = new ::MPI_Status(mpi_status); 64 status->ep_src = request->ep_src; 65 status->ep_tag = request->ep_tag; 66 status->ep_datatype = request->ep_datatype; 67 68 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 69 return 0; 70 71 } 72 73 if(request->type == 2) //=>irecv not probed 74 { 75 76 while(true) 77 { 78 Message_Check(request->comm); 79 // parcours pending list 80 for(std::list<MPI_Request* >::iterator it = (request->pending_ptr)->begin(); it!=(request->pending_ptr)->end(); ) 81 { 82 if(*it == request) 83 { 84 int probed = false; 85 MPI_Message message; 86 87 MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &probed, &message, status); 88 89 if(probed) 90 { 91 int recv_count; 92 MPI_Get_count(status, request->ep_datatype, &recv_count); 93 MPI_Mrecv(request->buf, recv_count, request->ep_datatype, &message, status); 94 (request->pending_ptr)->erase(it); 95 //printf("wait : pending request processed, size = %d\n", (request->pending_ptr)->size()); 96 request->type = 3; 97 98 return 0; 99 } 100 101 it++; 102 } 103 else 104 { 105 int probed = false; 106 MPI_Message message; 107 MPI_Status status; 108 109 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 110 111 if(probed) 112 { 113 int recv_count; 114 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 115 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 116 117 (request->pending_ptr)->erase(it); 118 119 it = (request->pending_ptr)->begin(); 120 //printf("wait : pending request processed, size = %d\n", (request->pending_ptr)->size()); 121 } 122 else it++; 123 } 124 } 125 126 } 127 128 129 } 40 status->mpi_status = &mpi_status; 41 status->ep_src = request->ep_src; 42 status->ep_tag = request->ep_tag; 43 status->ep_datatype = request->ep_datatype; 130 44 131 45 return MPI_SUCCESS; … … 140 54 int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses) 141 55 { 142 //int dest_rank; 143 //MPI_Comm_rank(MPI_COMM_WORLD, &dest_rank); 144 //printf("proc %d enters waitall\n", dest_rank); 56 std::vector<int> finished(count, 0); 145 57 146 int finished = 0;147 int finished_index[count];58 ::MPI_Request* mpi_request = new ::MPI_Request[count]; 59 ::MPI_Status* mpi_status = new ::MPI_Status[count]; 148 60 149 for(int i=0; i<count; i++) 150 printf("pending add = %p\n", array_of_requests[i].pending_ptr); 61 //if(EP_PendingRequests != 0) printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests); 151 62 152 //if(EP_PendingRequests == 0) EP_PendingRequests = new std::list< MPI_Request* >; 153 //printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests); 154 155 for(int i=0; i<count; i++) 156 { 157 finished_index[i] = false; 158 } 159 160 while(finished < count) 63 while(std::accumulate(finished.begin(), finished.end(), 0) < count) 161 64 { 162 65 163 66 for(int i=0; i<count; i++) 164 67 { 165 if( finished_index[i] == false) // this request has not been tested.68 if(array_of_requests[i].type !=1 && array_of_requests[i].type !=2 && array_of_requests[i].type !=3) 166 69 { 167 if(array_of_requests[i].type == 1 || array_of_requests[i].type == 3) // isend or imrecv 168 { 169 //MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 170 int tested; 171 MPI_Test(&array_of_requests[i], &tested, &array_of_statuses[i]); 172 if(!tested) MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 173 finished++; 174 finished_index[i] = true; 175 } 176 else // irecv 177 { 178 179 Message_Check(array_of_requests[i].comm); 180 // parcours pending list 181 for(std::list<MPI_Request* >::iterator it = (array_of_requests[i].pending_ptr)->begin(); it!=(array_of_requests[i].pending_ptr)->end(); ) 182 { 183 bool matched = false; 184 for(int j=0; j<count; j++) 185 { 186 if(*it == &array_of_requests[j]) 187 { 188 int probed = false; 189 MPI_Message message; 190 191 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &array_of_statuses[j]); 192 193 if(probed) 194 { 195 int recv_count; 196 MPI_Get_count(&array_of_statuses[j], array_of_requests[j].ep_datatype, &recv_count); 197 MPI_Mrecv(array_of_requests[j].buf, recv_count, array_of_requests[j].ep_datatype, &message, &array_of_statuses[j]); 198 //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 199 (array_of_requests[i].pending_ptr)->erase(it); 200 array_of_requests[j].type = 3; 201 finished++; 202 finished_index[j] = true; 203 matched = true; 204 it = (array_of_requests[i].pending_ptr)->begin(); 205 j=count; 206 //printf("waitall : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 207 //printf("matched = %d, j=%d, src = %d, tag = %d, probed = %d\n", matched, j, (*it)->ep_src, (*it)->ep_tag, probed); 208 } 209 } 210 211 } 212 213 if(!matched) 214 { 215 int probed = false; 216 MPI_Message message; 217 MPI_Status status; 218 219 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 220 221 if(probed) 222 { 223 int recv_count; 224 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 225 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 226 227 (array_of_requests[i].pending_ptr)->erase(it); 228 229 it = (array_of_requests[i].pending_ptr)->begin(); 230 //printf("waitall : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 231 } 232 else it++; 233 } 234 } 235 236 } 70 printf("Error in request type\n"); 71 72 exit(1); 73 } 74 75 if(array_of_requests[i].type == 2) Request_Check(); 76 if(array_of_requests[i].type != 2 && finished.at(i) == 0) 77 { 78 finished.at(i) = 1; 79 mpi_request[i] = static_cast< ::MPI_Request >(array_of_requests[i].mpi_request); 237 80 } 238 81 } 239 82 } 240 //printf("proc %d exits waitall\n", dest_rank); 83 84 ::MPI_Waitall(count, mpi_request, mpi_status); 85 86 for(int i=0; i<count; i++) 87 { 88 array_of_statuses[i].mpi_status = &mpi_status; 89 array_of_statuses[i].ep_src = array_of_requests[i].ep_src; 90 array_of_statuses[i].ep_tag = array_of_requests[i].ep_tag; 91 array_of_statuses[i].ep_datatype = array_of_requests[i].ep_datatype; 92 } 93 94 delete[] mpi_request; 95 delete[] mpi_status; 241 96 return MPI_SUCCESS; 242 97 } /* end of mpi_waitall*/ … … 244 99 245 100 } 101 -
XIOS/dev/branch_yushan_merged/src/client.cpp
r1187 r1196 115 115 116 116 117 test_sendrecv(CXios::globalComm);117 //test_sendrecv(CXios::globalComm); 118 118 MPI_Intercomm_create(intraComm,0,CXios::globalComm,serverLeader,0,&interComm) ; 119 119 -
XIOS/dev/branch_yushan_merged/src/client_client_dht_template_impl.hpp
r1185 r1196 175 175 recvIndexBuff = new unsigned long[recvNbIndexCount]; 176 176 177 std::vector<ep_lib::MPI_Request> request; 177 int request_size = 0; 178 179 int currentIndex = 0; 180 int nbRecvClient = recvRankClient.size(); 181 182 int position = 0; 183 184 for (int idx = 0; idx < nbRecvClient; ++idx) 185 { 186 if (0 != recvNbIndexClientCount[idx]) 187 { 188 request_size++; 189 } 190 } 191 192 request_size += client2ClientIndex.size(); 193 194 195 std::vector<ep_lib::MPI_Request> request(request_size); 196 178 197 std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex, 179 198 iteRecvIndex = recvRankClient.end(), 180 199 itbRecvNbIndex = recvNbIndexClientCount.begin(), 181 200 itRecvNbIndex; 182 int currentIndex = 0; 183 int nbRecvClient = recvRankClient.size(); 201 184 202 185 203 boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 186 204 iteIndex = client2ClientIndex.end(); 187 205 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 188 sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 189 190 191 206 { 207 MPI_Isend(itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG, itIndex->first, MPI_DHT_INDEX, commLevel, &request[position]); 208 position++; 209 //sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 210 } 211 192 212 for (int idx = 0; idx < nbRecvClient; ++idx) 193 213 { 194 214 if (0 != recvNbIndexClientCount[idx]) 195 215 { 196 recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 216 MPI_Irecv(recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], MPI_UNSIGNED_LONG, 217 recvRankClient[idx], MPI_DHT_INDEX, commLevel, &request[position]); 218 position++; 219 //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 197 220 } 198 221 currentIndex += recvNbIndexClientCount[idx]; … … 200 223 201 224 202 std::vector<ep_lib::MPI_Status> status(request .size());225 std::vector<ep_lib::MPI_Status> status(request_size); 203 226 MPI_Waitall(request.size(), &request[0], &status[0]); 204 227 205 228 206 229 CArray<size_t,1>* tmpGlobalIndex; … … 256 279 } 257 280 258 std::vector<ep_lib::MPI_Request> requestOnReturn; 281 request_size = 0; 282 for (int idx = 0; idx < recvRankOnReturn.size(); ++idx) 283 { 284 if (0 != recvNbIndexOnReturn[idx]) 285 { 286 request_size += 2; 287 } 288 } 289 290 for (int idx = 0; idx < nbRecvClient; ++idx) 291 { 292 if (0 != sendNbIndexOnReturn[idx]) 293 { 294 request_size += 2; 295 } 296 } 297 298 std::vector<ep_lib::MPI_Request> requestOnReturn(request_size); 259 299 currentIndex = 0; 300 position = 0; 260 301 for (int idx = 0; idx < recvRankOnReturn.size(); ++idx) 261 302 { 262 303 if (0 != recvNbIndexOnReturn[idx]) 263 304 { 264 recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn); 265 recvInfoFromClients(recvRankOnReturn[idx], 266 recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 267 recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), 268 commLevel, requestOnReturn); 305 //recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn); 306 MPI_Irecv(recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], MPI_UNSIGNED_LONG, 307 recvRankOnReturn[idx], MPI_DHT_INDEX, commLevel, &requestOnReturn[position]); 308 position++; 309 //recvInfoFromClients(recvRankOnReturn[idx], 310 // recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 311 // recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), 312 // commLevel, requestOnReturn); 313 MPI_Irecv(recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 314 recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), MPI_CHAR, 315 recvRankOnReturn[idx], MPI_DHT_INFO, commLevel, &requestOnReturn[position]); 316 position++; 269 317 } 270 318 currentIndex += recvNbIndexOnReturn[idx]; … … 299 347 } 300 348 301 sendIndexToClients(rank, client2ClientIndexOnReturn[rank], 302 sendNbIndexOnReturn[idx], commLevel, requestOnReturn); 303 sendInfoToClients(rank, client2ClientInfoOnReturn[rank], 304 sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn); 349 //sendIndexToClients(rank, client2ClientIndexOnReturn[rank], 350 // sendNbIndexOnReturn[idx], commLevel, requestOnReturn); 351 MPI_Isend(client2ClientIndexOnReturn[rank], sendNbIndexOnReturn[idx], MPI_UNSIGNED_LONG, 352 rank, MPI_DHT_INDEX, commLevel, &requestOnReturn[position]); 353 position++; 354 //sendInfoToClients(rank, client2ClientInfoOnReturn[rank], 355 // sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn); 356 MPI_Isend(client2ClientInfoOnReturn[rank], sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), MPI_CHAR, 357 rank, MPI_DHT_INFO, commLevel, &requestOnReturn[position]); 358 position++; 305 359 } 306 360 currentIndex += recvNbIndexClientCount[idx]; … … 440 494 int recvNbIndexCount = 0; 441 495 for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx) 496 { 442 497 recvNbIndexCount += recvNbIndexClientCount[idx]; 498 } 443 499 444 500 unsigned long* recvIndexBuff; … … 453 509 // it will send a message to the correct clients. 454 510 // Contents of the message are index and its corresponding informatioin 455 std::vector<ep_lib::MPI_Request> request;511 int request_size = 0; 456 512 int currentIndex = 0; 457 513 int nbRecvClient = recvRankClient.size(); 514 int current_pos = 0; 515 458 516 for (int idx = 0; idx < nbRecvClient; ++idx) 459 517 { 460 518 if (0 != recvNbIndexClientCount[idx]) 461 519 { 462 recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 463 recvInfoFromClients(recvRankClient[idx], 464 recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 465 recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 466 commLevel, request); 467 } 468 currentIndex += recvNbIndexClientCount[idx]; 469 } 520 request_size += 2; 521 } 522 //currentIndex += recvNbIndexClientCount[idx]; 523 } 524 525 request_size += client2ClientIndex.size(); 526 request_size += client2ClientInfo.size(); 527 528 529 530 std::vector<ep_lib::MPI_Request> request(request_size); 531 532 //unsigned long* tmp_send_buf_long[client2ClientIndex.size()]; 533 //unsigned char* tmp_send_buf_char[client2ClientInfo.size()]; 534 535 int info_position = 0; 536 int index_position = 0; 537 470 538 471 539 boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, … … 473 541 for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 474 542 { 475 sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 543 //sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 544 545 //tmp_send_buf_long[index_position] = new unsigned long[sendNbIndexBuff[itIndex->first-groupRankBegin]]; 546 //for(int i=0; i<sendNbIndexBuff[itIndex->first-groupRankBegin]; i++) 547 //{ 548 // tmp_send_buf_long[index_position][i] = (static_cast<unsigned long * >(itIndex->second))[i]; 549 //} 550 //MPI_Isend(tmp_send_buf_long[current_pos], sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG, 551 MPI_Isend(itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG, 552 itIndex->first, MPI_DHT_INDEX, commLevel, &request[current_pos]); 553 current_pos++; 554 index_position++; 555 476 556 } 477 557 … … 480 560 for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) 481 561 { 482 sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request); 483 562 //sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request); 563 564 //tmp_send_buf_char[info_position] = new unsigned char[sendNbInfo[itInfo->first-groupRankBegin]]; 565 //for(int i=0; i<sendNbInfo[itInfo->first-groupRankBegin]; i++) 566 //{ 567 // tmp_send_buf_char[info_position][i] = (static_cast<unsigned char * >(itInfo->second))[i]; 568 //} 569 570 MPI_Isend(itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], MPI_CHAR, 571 itInfo->first, MPI_DHT_INFO, commLevel, &request[current_pos]); 572 573 current_pos++; 574 info_position++; 575 } 576 577 for (int idx = 0; idx < nbRecvClient; ++idx) 578 { 579 if (0 != recvNbIndexClientCount[idx]) 580 { 581 //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 582 MPI_Irecv(recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], MPI_UNSIGNED_LONG, 583 recvRankClient[idx], MPI_DHT_INDEX, commLevel, &request[current_pos]); 584 current_pos++; 585 586 587 MPI_Irecv(recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 588 recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 589 MPI_CHAR, recvRankClient[idx], MPI_DHT_INFO, commLevel, &request[current_pos]); 590 591 current_pos++; 592 593 594 595 // recvInfoFromClients(recvRankClient[idx], 596 // recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 597 // recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 598 // commLevel, request); 599 } 600 currentIndex += recvNbIndexClientCount[idx]; 484 601 } 485 602 486 603 std::vector<ep_lib::MPI_Status> status(request.size()); 487 604 488 605 MPI_Waitall(request.size(), &request[0], &status[0]); 606 607 608 //for(int i=0; i<client2ClientInfo.size(); i++) 609 // delete[] tmp_send_buf_char[i]; 610 611 612 613 //for(int i=0; i<client2ClientIndex.size(); i++) 614 // delete[] tmp_send_buf_long[i]; 615 489 616 490 617 Index2VectorInfoTypeMap indexToInfoMapping; … … 527 654 else 528 655 index2InfoMapping_.swap(indexToInfoMapping); 656 529 657 } 530 658 … … 720 848 std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize); 721 849 std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize); 722 850 //ep_lib::MPI_Request request[sendBuffSize+recvBuffSize]; 851 //ep_lib::MPI_Status requestStatus[sendBuffSize+recvBuffSize]; 852 723 853 int my_rank; 724 854 MPI_Comm_rank(this->internalComm_, &my_rank); 725 855 726 856 int nRequest = 0; 857 for (int idx = 0; idx < recvBuffSize; ++idx) 858 { 859 MPI_Irecv(&recvBuff[2*idx], 2, MPI_INT, 860 recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]); 861 ++nRequest; 862 } 727 863 728 864 … … 743 879 } 744 880 745 for (int idx = 0; idx < recvBuffSize; ++idx) 746 { 747 MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT, 748 recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]); 749 ++nRequest; 750 } 881 751 882 752 883 //MPI_Barrier(this->internalComm_); 753 884 754 885 MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]); 755 886 //MPI_Waitall(sendBuffSize+recvBuffSize, request, requestStatus); 887 888 756 889 int nbRecvRank = 0, nbRecvElements = 0; 757 890 recvNbRank.clear(); … … 765 898 } 766 899 } 767 } 768 769 } 900 901 902 903 904 } 905 906 } 907 -
XIOS/dev/branch_yushan_merged/src/server.cpp
r1187 r1196 93 93 <<" intraCommRank :"<<intraCommRank<<" clientLeader "<< clientLeader<<endl ; 94 94 95 test_sendrecv(CXios::globalComm);95 // test_sendrecv(CXios::globalComm); 96 96 MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ; 97 97 interComm.push_back(newComm) ; -
XIOS/dev/branch_yushan_merged/src/test/test_omp.f90
r1134 r1196 47 47 if(rank < size-2) then 48 48 49 !$omp parallel default( private)49 !$omp parallel default(firstprivate) 50 50 51 51 CALL xios_initialize(id,return_comm=comm) -
XIOS/dev/branch_yushan_merged/src/transformation/grid_transformation.cpp
r1134 r1196 474 474 // Sending global index of grid source to corresponding process as well as the corresponding mask 475 475 std::vector<ep_lib::MPI_Request> requests; 476 requests.reserve(2*recvRankSizeMap.size()+2*globaIndexWeightFromSrcToDst.size()); 476 477 std::vector<ep_lib::MPI_Status> status; 477 478 boost::unordered_map<int, unsigned char* > recvMaskDst; … … 534 535 std::vector<ep_lib::MPI_Request>().swap(requests); 535 536 std::vector<ep_lib::MPI_Status>().swap(status); 537 requests.reserve(sendRankSizeMap.size()+recvRankSizeMap.size()); 536 538 // Okie, on destination side, we will wait for information of masked index of source 537 539 for (std::map<int,int>::const_iterator itSend = sendRankSizeMap.begin(); itSend != sendRankSizeMap.end(); ++itSend)
Note: See TracChangeset
for help on using the changeset viewer.