Changeset 1147 for XIOS/dev/branch_yushan_merged/extern/src_ep_dev
- Timestamp:
- 05/29/17 16:15:38 (7 years ago)
- Location:
- XIOS/dev/branch_yushan_merged/extern/src_ep_dev
- Files:
-
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_alltoall.cpp
r1146 r1147 16 16 int ep_size; 17 17 MPI_Comm_size(comm, &ep_size); 18 18 19 19 20 for(int i=0; i<ep_size; i++) 20 21 { 21 MPI_Gather(sendbuf+i*sendcount*typesize, sendcount, sendtype, recvbuf, recvcount, recvtype, i, comm);22 ep_lib::MPI_Gather(sendbuf+i*sendcount*typesize, sendcount, sendtype, recvbuf, recvcount, recvtype, i, comm); 22 23 } 24 23 25 24 26 return 0; -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_gather.cpp
r1134 r1147 352 352 353 353 void *local_gather_recvbuf; 354 void *master_recvbuf; 355 if(ep_rank_loc == 0 && mpi_rank == root_mpi_rank && root_ep_loc != 0) master_recvbuf = new void*[sizeof(recvbuf)]; 354 356 355 357 if(ep_rank_loc==0) … … 381 383 } 382 384 383 384 ::MPI_Gatherv(local_gather_recvbuf, count*num_ep, static_cast< ::MPI_Datatype>(datatype), recvbuf, gatherv_recvcnt, 385 if(root_ep_loc != 0) // gather to root_master 386 { 387 ::MPI_Gatherv(local_gather_recvbuf, count*num_ep, static_cast< ::MPI_Datatype>(datatype), master_recvbuf, gatherv_recvcnt, 385 388 gatherv_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 389 } 390 else 391 { 392 ::MPI_Gatherv(local_gather_recvbuf, count*num_ep, static_cast< ::MPI_Datatype>(datatype), recvbuf, gatherv_recvcnt, 393 gatherv_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 394 } 386 395 387 396 delete[] gatherv_recvcnt; … … 392 401 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) // root is not master, master send to root and root receive from master 393 402 { 394 innode_memcpy(0, recvbuf, root_ep_loc, recvbuf, count*ep_size, datatype, comm); 403 innode_memcpy(0, master_recvbuf, root_ep_loc, recvbuf, count*ep_size, datatype, comm); 404 if(ep_rank_loc == 0 ) delete[] master_recvbuf; 395 405 } 396 406 … … 399 409 if(ep_rank_loc==0) 400 410 { 411 401 412 if(datatype == MPI_INT) 402 413 { -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_gatherv.cpp
r1145 r1147 386 386 void *local_gather_recvbuf; 387 387 int buffer_size; 388 void *master_recvbuf; 389 390 if(ep_rank_loc == 0 && mpi_rank == root_mpi_rank && root_ep_loc != 0) master_recvbuf = new void*[sizeof(recvbuf)]; 388 391 389 392 if(ep_rank_loc==0) … … 411 414 ::MPI_Gather(&buff_start, 1, MPI_INT_STD, mpi_displs, 1, MPI_INT_STD, root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 412 415 413 414 ::MPI_Gatherv(local_gather_recvbuf + datasize*buff_start, mpi_sendcnt, static_cast< ::MPI_Datatype>(datatype), recvbuf, mpi_recvcnt,416 if(root_ep_loc == 0) 417 { ::MPI_Gatherv(local_gather_recvbuf + datasize*buff_start, mpi_sendcnt, static_cast< ::MPI_Datatype>(datatype), recvbuf, mpi_recvcnt, 415 418 mpi_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 419 } 420 else // gatherv to master_recvbuf 421 { ::MPI_Gatherv(local_gather_recvbuf + datasize*buff_start, mpi_sendcnt, static_cast< ::MPI_Datatype>(datatype), master_recvbuf, mpi_recvcnt, 422 mpi_displs, static_cast< ::MPI_Datatype>(datatype), root_mpi_rank, static_cast< ::MPI_Comm>(comm.mpi_comm)); 423 } 416 424 417 425 delete[] mpi_recvcnt; … … 425 433 if(root_ep_loc != 0 && mpi_rank == root_mpi_rank) // root is not master, master send to root and root receive from master 426 434 { 427 innode_memcpy(0, recvbuf+datasize*global_min_displs, root_ep_loc, recvbuf+datasize*global_min_displs, global_recvcnt, datatype, comm); 435 innode_memcpy(0, master_recvbuf+datasize*global_min_displs, root_ep_loc, recvbuf+datasize*global_min_displs, global_recvcnt, datatype, comm); 436 if(ep_rank_loc == 0) delete[] master_recvbuf; 428 437 } 429 438 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_recv.cpp
r1134 r1147 14 14 15 15 16 namespace ep_lib { 16 namespace ep_lib 17 { 17 18 18 19 19 int MPI_Recv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Status *status) 20 { 20 21 int dest_rank; 21 22 MPI_Comm_rank(comm, &dest_rank); … … 41 42 42 43 return 0; 43 44 } 44 45 45 46 46 47 47 48 48 49 49 int MPI_Irecv(void *buf, int count, MPI_Datatype datatype, int src, int tag, MPI_Comm comm, MPI_Request *request) 50 { 50 51 51 52 Debug("MPI_Irecv with EP"); 52 53 int dest_rank; 53 54 MPI_Comm_rank(comm, &dest_rank); 54 55 55 56 57 58 59 56 if(!comm.is_ep) 57 { 58 ::MPI_Request mpi_request; 59 ::MPI_Comm mpi_comm = static_cast< ::MPI_Comm > (comm.mpi_comm); 60 ::MPI_Irecv(buf, count, static_cast< ::MPI_Datatype> (datatype), src<0? MPI_ANY_SOURCE : src, tag<0? MPI_ANY_TAG: tag, mpi_comm, &mpi_request); 60 61 61 62 request->mpi_request = mpi_request; 62 63 request->ep_src = src; 63 64 request->ep_datatype = datatype; 64 65 request->ep_tag = tag; 65 66 } 66 67 67 68 Message_Check(comm); 68 69 69 70 70 71 71 request->mpi_request = MPI_REQUEST_NULL_STD; 72 request->buf = buf; 72 73 request->comm = comm; 73 74 request->type = 2; 74 75 75 76 request->ep_src = src; 76 77 request->ep_tag = tag; 77 78 request->ep_datatype = datatype; 78 79 79 80 81 /* With Improbe*/ 82 Message_Check(comm); 80 /* With Improbe*/ 81 Message_Check(comm); 83 82 84 83 int flag = false; … … 94 93 95 94 return 0; 96 95 } 97 96 98 99 100 97 int MPI_Imrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Request *request) 98 { 99 Debug("MPI_Imrecv"); 101 100 102 101 request->type = 3; 103 102 104 103 ::MPI_Request mpi_request; 105 104 ::MPI_Message mpi_message = static_cast< ::MPI_Message >(message->mpi_message); 106 105 107 106 ::MPI_Imrecv(buf, count, static_cast< ::MPI_Datatype>(datatype), &mpi_message, &mpi_request); 108 107 109 108 request->mpi_request = mpi_request; … … 114 113 115 114 return 0; 116 115 } 117 116 118 117 119 118 int MPI_Mrecv(void *buf, int count, MPI_Datatype datatype, MPI_Message *message, MPI_Status *status) 120 119 { 121 120 Debug("EP Mrecv called\n"); 122 121 123 122 ::MPI_Status mpi_status; 124 123 ::MPI_Message mpi_message = static_cast< ::MPI_Message >(message->mpi_message); 125 126 124 125 ::MPI_Mrecv(buf, count, static_cast< ::MPI_Datatype>(datatype), &mpi_message, &mpi_status); 127 126 128 127 status->mpi_status = new ::MPI_Status(mpi_status); … … 134 133 135 134 return 0; 136 } 135 } 136 137 } 137 138 138 139 139 140 141 } -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_send.cpp
r1134 r1147 158 158 request->type = 1; // used in wait 159 159 request->comm = comm; 160 request->buf = NULL; 160 161 161 162 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_wait.cpp
r1146 r1147 14 14 15 15 16 namespace ep_lib { 16 namespace ep_lib 17 { 17 18 18 int MPI_Wait(MPI_Request *request, MPI_Status *status) 19 int MPI_Wait(MPI_Request *request, MPI_Status *status) 20 { 21 22 if(request->type == 1) 19 23 { 24 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 25 ::MPI_Status mpi_status; 26 ::MPI_Wait(&mpi_request, &mpi_status); 20 27 21 if(request->type == 1) 22 { 23 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 24 ::MPI_Status mpi_status; 25 ::MPI_Wait(&mpi_request, &mpi_status); 28 status->mpi_status = &mpi_status; 29 status->ep_src = request->ep_src; 30 status->ep_tag = request->ep_tag; 31 status->ep_datatype = request->ep_datatype; 26 32 33 return 0; 34 } 27 35 36 if(request->type == 2) 37 { 38 int flag = false; 39 MPI_Message message; 28 40 29 status->mpi_status = &mpi_status; 30 status->ep_src = request->ep_src; 31 status->ep_tag = request->ep_tag; 32 status->ep_datatype = request->ep_datatype; 41 while(!flag) 42 { 43 Message_Check(request->comm); 44 #pragma omp flush 45 MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &flag, &message, status); 46 } 33 47 34 return 0; 35 } 48 int count; 49 MPI_Get_count(status, request->ep_datatype, &count); 50 MPI_Mrecv(request->buf, count, request->ep_datatype, &message, status); 51 status->ep_datatype = request->ep_datatype; 36 52 37 if(request->type == 2) 38 { 39 int flag = false; 40 MPI_Message message; 53 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 41 54 42 while(!flag) 43 { 44 Message_Check(request->comm); 45 #pragma omp flush 46 MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &flag, &message, status); 47 } 55 return 0; 56 } 48 57 49 int count; 50 MPI_Get_count(status, request->ep_datatype, &count); 51 MPI_Mrecv(request->buf, count, request->ep_datatype, &message, status); 52 status->ep_datatype = request->ep_datatype; 58 if(request->type == 3) 59 { 60 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 61 ::MPI_Status mpi_status; 62 ::MPI_Wait(&mpi_request, &mpi_status); 53 63 54 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 64 status->mpi_status = new ::MPI_Status(mpi_status); 65 status->ep_src = request->ep_src; 66 status->ep_tag = request->ep_tag; 67 status->ep_datatype = request->ep_datatype; 55 68 56 return 0;57 69 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 70 } 58 71 59 if(request->type == 3) 60 { 61 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 62 ::MPI_Status mpi_status; 63 ::MPI_Wait(&mpi_request, &mpi_status); 72 return MPI_SUCCESS; 64 73 65 status->mpi_status = new ::MPI_Status(mpi_status); 66 status->ep_src = request->ep_src; 67 status->ep_tag = request->ep_tag; 68 status->ep_datatype = request->ep_datatype; 69 70 //int count; 71 //MPI_Get_count(status, request->ep_datatype, &count); 72 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 73 } 74 return MPI_SUCCESS; 75 } 74 } /*end of mpi_wait*/ 76 75 77 76 … … 80 79 81 80 82 int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses) 81 int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses) 82 { 83 int dest_rank; 84 MPI_Comm_rank(MPI_COMM_WORLD, &dest_rank); 85 86 int finished = 0; 87 int finished_index[count]; 88 89 for(int i=0; i<count; i++) 83 90 { 84 int dest_rank; 85 MPI_Comm_rank(MPI_COMM_WORLD, &dest_rank); 86 87 int finished = 0; 88 int finished_index[count]; 89 90 for(int i=0; i<count; i++) 91 { 92 finished_index[i] = false; 93 } 94 95 while(finished < count) 96 { 97 for(int i=0; i<count; i++) 98 { 99 //MPI_Test(&array_of_requests[i], &finished_index[i], &array_of_statuses[i]); 100 if(finished_index[i] == false) // this request has not been tested. 101 { 102 if(array_of_requests[i].type != 2) // isend or imrecv 103 { 104 MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 105 finished++; 106 finished_index[i] = true; 107 } 108 else // irecv 109 { 110 int flag = false; 111 MPI_Message message; 112 113 MPI_Improbe(array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, &flag, &message, &array_of_statuses[i]); 114 115 if(flag) 116 { 117 //printf("dest_rank = %d, Waiting one message with src = %d, tag = %d, buf = %p\n", dest_rank, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].buf); 118 int recv_count; 119 MPI_Get_count(&array_of_statuses[i], array_of_requests[i].ep_datatype, &recv_count); 120 MPI_Mrecv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, &message, &array_of_statuses[i]); 121 //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 122 123 finished++; 124 finished_index[i] = true; 125 } 126 } 127 } 128 } 129 } 130 return MPI_SUCCESS; 91 finished_index[i] = false; 131 92 } 132 93 94 while(finished < count) 95 { 96 for(int i=0; i<count; i++) 97 { 98 if(finished_index[i] == false) // this request has not been tested. 99 { 100 if(array_of_requests[i].type != 2) // isend or imrecv 101 { 102 MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 103 finished++; 104 finished_index[i] = true; 105 } 106 else // irecv 107 { 108 int flag = false; 109 MPI_Message message; 133 110 111 MPI_Improbe(array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, &flag, &message, &array_of_statuses[i]); 134 112 113 if(flag) 114 { 115 int recv_count; 116 MPI_Get_count(&array_of_statuses[i], array_of_requests[i].ep_datatype, &recv_count); 117 MPI_Mrecv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, &message, &array_of_statuses[i]); 118 //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 119 120 finished++; 121 finished_index[i] = true; 122 } 123 } 124 } 125 } 126 } 127 return MPI_SUCCESS; 128 } /* end of mpi_waitall*/ 135 129 136 130
Note: See TracChangeset
for help on using the changeset viewer.