- Timestamp:
- 07/05/17 14:14:09 (7 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_wait.cpp
r1187 r1196 12 12 using namespace std; 13 13 14 extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests; 15 #pragma omp threadprivate(EP_PendingRequests) 16 14 17 15 18 16 19 namespace ep_lib 17 20 { 18 21 19 22 int MPI_Wait(MPI_Request *request, MPI_Status *status) 20 23 { 21 22 if(request->type == 1) //=>isend 24 if(request->type !=1 && request->type !=2 && request->type !=3) 23 25 { 24 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 25 ::MPI_Status mpi_status; 26 ::MPI_Errhandler_set(MPI_COMM_WORLD_STD, MPI_ERRORS_RETURN); 27 int error_code = ::MPI_Wait(&mpi_request, &mpi_status); 28 if (error_code != MPI_SUCCESS) { 29 30 char error_string[BUFSIZ]; 31 int length_of_error_string, error_class; 32 33 ::MPI_Error_class(error_code, &error_class); 34 ::MPI_Error_string(error_class, error_string, &length_of_error_string); 35 printf("%s\n", error_string); 36 } 37 38 status->mpi_status = &mpi_status; 39 status->ep_src = request->ep_src; 40 status->ep_tag = request->ep_tag; 41 status->ep_datatype = request->ep_datatype; 42 43 return 0; 26 printf("Error in request type\n"); 27 28 exit(1); 44 29 } 45 30 46 if(request->type == 3) //=>imrecv 47 { 48 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 49 ::MPI_Status mpi_status; 50 ::MPI_Errhandler_set(MPI_COMM_WORLD_STD, MPI_ERRORS_RETURN); 51 int error_code = ::MPI_Wait(&mpi_request, &mpi_status); 52 if (error_code != MPI_SUCCESS) { 31 while(request->type == 2) Request_Check(); 32 33 34 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); 35 ::MPI_Status mpi_status; 36 ::MPI_Wait(&mpi_request, &mpi_status); 53 37 54 char error_string[BUFSIZ]; 55 int length_of_error_string, error_class; 56 57 ::MPI_Error_class(error_code, &error_class); 58 ::MPI_Error_string(error_class, error_string, &length_of_error_string); 59 printf("%s\n", error_string); 60 } 61 38 request->mpi_request = mpi_request; 62 39 63 status->mpi_status = new ::MPI_Status(mpi_status); 64 status->ep_src = request->ep_src; 65 status->ep_tag = request->ep_tag; 66 status->ep_datatype = request->ep_datatype; 67 68 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 69 return 0; 70 71 } 72 73 if(request->type == 2) //=>irecv not probed 74 { 75 76 while(true) 77 { 78 Message_Check(request->comm); 79 // parcours pending list 80 for(std::list<MPI_Request* >::iterator it = (request->pending_ptr)->begin(); it!=(request->pending_ptr)->end(); ) 81 { 82 if(*it == request) 83 { 84 int probed = false; 85 MPI_Message message; 86 87 MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &probed, &message, status); 88 89 if(probed) 90 { 91 int recv_count; 92 MPI_Get_count(status, request->ep_datatype, &recv_count); 93 MPI_Mrecv(request->buf, recv_count, request->ep_datatype, &message, status); 94 (request->pending_ptr)->erase(it); 95 //printf("wait : pending request processed, size = %d\n", (request->pending_ptr)->size()); 96 request->type = 3; 97 98 return 0; 99 } 100 101 it++; 102 } 103 else 104 { 105 int probed = false; 106 MPI_Message message; 107 MPI_Status status; 108 109 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 110 111 if(probed) 112 { 113 int recv_count; 114 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 115 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 116 117 (request->pending_ptr)->erase(it); 118 119 it = (request->pending_ptr)->begin(); 120 //printf("wait : pending request processed, size = %d\n", (request->pending_ptr)->size()); 121 } 122 else it++; 123 } 124 } 125 126 } 127 128 129 } 40 status->mpi_status = &mpi_status; 41 status->ep_src = request->ep_src; 42 status->ep_tag = request->ep_tag; 43 status->ep_datatype = request->ep_datatype; 130 44 131 45 return MPI_SUCCESS; … … 140 54 int MPI_Waitall(int count, MPI_Request *array_of_requests, MPI_Status *array_of_statuses) 141 55 { 142 //int dest_rank; 143 //MPI_Comm_rank(MPI_COMM_WORLD, &dest_rank); 144 //printf("proc %d enters waitall\n", dest_rank); 56 std::vector<int> finished(count, 0); 145 57 146 int finished = 0;147 int finished_index[count];58 ::MPI_Request* mpi_request = new ::MPI_Request[count]; 59 ::MPI_Status* mpi_status = new ::MPI_Status[count]; 148 60 149 for(int i=0; i<count; i++) 150 printf("pending add = %p\n", array_of_requests[i].pending_ptr); 61 //if(EP_PendingRequests != 0) printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests); 151 62 152 //if(EP_PendingRequests == 0) EP_PendingRequests = new std::list< MPI_Request* >; 153 //printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests); 154 155 for(int i=0; i<count; i++) 156 { 157 finished_index[i] = false; 158 } 159 160 while(finished < count) 63 while(std::accumulate(finished.begin(), finished.end(), 0) < count) 161 64 { 162 65 163 66 for(int i=0; i<count; i++) 164 67 { 165 if( finished_index[i] == false) // this request has not been tested.68 if(array_of_requests[i].type !=1 && array_of_requests[i].type !=2 && array_of_requests[i].type !=3) 166 69 { 167 if(array_of_requests[i].type == 1 || array_of_requests[i].type == 3) // isend or imrecv 168 { 169 //MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 170 int tested; 171 MPI_Test(&array_of_requests[i], &tested, &array_of_statuses[i]); 172 if(!tested) MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); 173 finished++; 174 finished_index[i] = true; 175 } 176 else // irecv 177 { 178 179 Message_Check(array_of_requests[i].comm); 180 // parcours pending list 181 for(std::list<MPI_Request* >::iterator it = (array_of_requests[i].pending_ptr)->begin(); it!=(array_of_requests[i].pending_ptr)->end(); ) 182 { 183 bool matched = false; 184 for(int j=0; j<count; j++) 185 { 186 if(*it == &array_of_requests[j]) 187 { 188 int probed = false; 189 MPI_Message message; 190 191 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &array_of_statuses[j]); 192 193 if(probed) 194 { 195 int recv_count; 196 MPI_Get_count(&array_of_statuses[j], array_of_requests[j].ep_datatype, &recv_count); 197 MPI_Mrecv(array_of_requests[j].buf, recv_count, array_of_requests[j].ep_datatype, &message, &array_of_statuses[j]); 198 //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 199 (array_of_requests[i].pending_ptr)->erase(it); 200 array_of_requests[j].type = 3; 201 finished++; 202 finished_index[j] = true; 203 matched = true; 204 it = (array_of_requests[i].pending_ptr)->begin(); 205 j=count; 206 //printf("waitall : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 207 //printf("matched = %d, j=%d, src = %d, tag = %d, probed = %d\n", matched, j, (*it)->ep_src, (*it)->ep_tag, probed); 208 } 209 } 210 211 } 212 213 if(!matched) 214 { 215 int probed = false; 216 MPI_Message message; 217 MPI_Status status; 218 219 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 220 221 if(probed) 222 { 223 int recv_count; 224 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 225 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 226 227 (array_of_requests[i].pending_ptr)->erase(it); 228 229 it = (array_of_requests[i].pending_ptr)->begin(); 230 //printf("waitall : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 231 } 232 else it++; 233 } 234 } 235 236 } 70 printf("Error in request type\n"); 71 72 exit(1); 73 } 74 75 if(array_of_requests[i].type == 2) Request_Check(); 76 if(array_of_requests[i].type != 2 && finished.at(i) == 0) 77 { 78 finished.at(i) = 1; 79 mpi_request[i] = static_cast< ::MPI_Request >(array_of_requests[i].mpi_request); 237 80 } 238 81 } 239 82 } 240 //printf("proc %d exits waitall\n", dest_rank); 83 84 ::MPI_Waitall(count, mpi_request, mpi_status); 85 86 for(int i=0; i<count; i++) 87 { 88 array_of_statuses[i].mpi_status = &mpi_status; 89 array_of_statuses[i].ep_src = array_of_requests[i].ep_src; 90 array_of_statuses[i].ep_tag = array_of_requests[i].ep_tag; 91 array_of_statuses[i].ep_datatype = array_of_requests[i].ep_datatype; 92 } 93 94 delete[] mpi_request; 95 delete[] mpi_status; 241 96 return MPI_SUCCESS; 242 97 } /* end of mpi_waitall*/ … … 244 99 245 100 } 101
Note: See TracChangeset
for help on using the changeset viewer.