Changeset 1187 for XIOS/dev/branch_yushan_merged/extern
- Timestamp:
- 06/28/17 17:46:00 (7 years ago)
- Location:
- XIOS/dev/branch_yushan_merged/extern/src_ep_dev
- Files:
-
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_lib.cpp
r1134 r1187 9 9 10 10 namespace ep_lib 11 { 12 11 { 13 12 14 13 int tag_combine(int real_tag, int src, int dest) … … 368 367 369 368 } 369 370 int test_sendrecv(MPI_Comm comm) 371 { 372 int myRank; 373 MPI_Comm_rank(comm, &myRank); 374 bool amClient = false; 375 bool amServer = false; 376 if(myRank<=3) amClient = true; 377 else amServer = true; 378 379 if(amServer) 380 { 381 int send_buf[4]; 382 MPI_Request send_request[8]; 383 MPI_Status send_status[8]; 384 385 386 387 for(int j=0; j<4; j++) // 4 buffers 388 { 389 for(int i=0; i<2; i++) 390 { 391 send_buf[j] = (myRank+1)*100 + j; 392 MPI_Isend(&send_buf[j], 1, MPI_INT, i*2, 9999, comm, &send_request[i*4+j]); 393 } 394 } 395 396 397 MPI_Waitall(8, send_request, send_status); 398 } 399 400 401 if(amClient&&myRank%2==0) // Clients leaders 402 { 403 int recv_buf[8]; 404 MPI_Request recv_request[8]; 405 MPI_Status recv_status[8]; 406 407 for(int i=0; i<2; i++) // 2 servers 408 { 409 for(int j=0; j<4; j++) 410 { 411 MPI_Irecv(&recv_buf[i*4+j], 1, MPI_INT, i+4, 9999, comm, &recv_request[i*4+j]); 412 } 413 } 414 415 MPI_Waitall(8, recv_request, recv_status); 416 printf("============ client %d, recv_buf = %d, %d, %d, %d, %d, %d, %d, %d ================\n", 417 myRank, recv_buf[0], recv_buf[1], recv_buf[2], recv_buf[3], recv_buf[4], recv_buf[5], recv_buf[6], recv_buf[7]); 418 } 419 420 MPI_Barrier(comm); 421 422 } 423 370 424 } 371 425 -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_message.cpp
r1185 r1187 79 79 int dest_mpi = comm.ep_comm_ptr->size_rank_info[2].first; 80 80 int ep_dest = get_ep_rank(comm, dest_loc, dest_mpi); 81 printf("myRank = %d, probed one message, ep_src = %d, ep_dest = %d, tag = %d, message = %d\n", myRank, msg_block->ep_src, ep_dest, msg_block->ep_tag, msg_block->mpi_message);81 //printf("myRank = %d, probed one message, ep_src = %d, ep_dest = %d, tag = %d, message = %d\n", myRank, msg_block->ep_src, ep_dest, msg_block->ep_tag, msg_block->mpi_message); 82 82 msg_block->mpi_status = new ::MPI_Status(status); 83 83 … … 90 90 #pragma omp flush 91 91 ptr_comm_target->ep_comm_ptr->message_queue->push_back(*msg_block); 92 printf("myRank = %d, push_back OK, ep_src = %d, ep_tag = %d, dest = %d(%d)\n", myRank,93 ptr_comm_target->ep_comm_ptr->message_queue->back().ep_src,94 ptr_comm_target->ep_comm_ptr->message_queue->back().ep_tag,95 ep_dest, dest_loc);92 //printf("myRank = %d, push_back OK, ep_src = %d, ep_tag = %d, dest = %d(%d)\n", myRank, 93 // ptr_comm_target->ep_comm_ptr->message_queue->back().ep_src, 94 // ptr_comm_target->ep_comm_ptr->message_queue->back().ep_tag, 95 // ep_dest, dest_loc); 96 96 97 97 #pragma omp flush -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_recv.cpp
r1185 r1187 76 76 request->ep_datatype = datatype; 77 77 78 79 78 80 /* With Improbe*/ 79 81 Message_Check(comm); 80 82 83 if(EP_PendingRequests == 0 ) 84 { 85 EP_PendingRequests = new std::list< MPI_Request* >; 86 printf("proc %d : EP_PendingRequests allocated, add = %p\n", dest_rank, EP_PendingRequests); 87 } 88 89 request->pending_ptr = EP_PendingRequests; 90 printf("proc %d : &EP_PendingRequests add = %p, ptr = %p\n", dest_rank, EP_PendingRequests, request->pending_ptr); 91 81 92 EP_PendingRequests->push_back(request); 82 printf("proc %d : EP_PendingRequests insert one request, add = %p(%p), buf = %p(%p)\n", dest_rank, EP_PendingRequests->back(), request, buf, request->buf);93 //printf("proc %d : EP_PendingRequests insert one request, src = %d, tag = %d, size = %d\n", dest_rank, request->ep_src, request->ep_tag, EP_PendingRequests->size()); 83 94 84 95 // check all EP_PendingRequests 85 96 86 //if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >; 87 if(!EP_PendingRequests->empty()) 97 //printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size()); 98 99 for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 88 100 { 89 printf("proc %d have %d pending irecv request\n", dest_rank, EP_PendingRequests->size()); 90 91 for(std::list<MPI_Request* >::iterator it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 101 if((*it)->type == 3) 92 102 { 93 if((*it)->type != 2) 94 { 95 printf("proc %d : pending request type = %d, src= %d, tag = %d, add = %p skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag, *it); 96 EP_PendingRequests->erase(it); 97 it = EP_PendingRequests->begin(); 98 printf("proc %d : pending request processed, size = %d, it = %p\n", dest_rank, EP_PendingRequests->size(), *it); 99 continue; 100 } 103 //printf("proc %d : pending request type = %d, src= %d, tag = %d skip\n", dest_rank, (*it)->type, (*it)->ep_src, (*it)->ep_tag); 104 EP_PendingRequests->erase(it); 105 it = EP_PendingRequests->begin(); 106 //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 107 continue; 108 } 101 109 102 printf("proc %d : pending irecv request src = %d, tag = %d, type = %d, add = %p\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type, *it);103 104 105 110 //printf("proc %d : pending irecv request src = %d, tag = %d, type = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, (*it)->type); 111 int probed = false; 112 MPI_Message pending_message; 113 MPI_Status pending_status; 106 114 107 108 printf("proc %d : pending irecv request probed to be %d, add = %p\n",dest_rank, probed, *it);115 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &pending_message, &pending_status); 116 //printf("proc %d : pending irecv request probed to be %d, src = %d, tag = %d\n",dest_rank, probed, (*it)->ep_src, (*it)->ep_tag); 109 117 110 if(probed) 111 { 112 int count; 113 MPI_Get_count(&pending_status, (*it)->ep_datatype, &count); 114 MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it); 115 printf("proc %d : pending request is imrecving src = %d, tag = %d, add = %p, buf = %p, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, *it, (*it)->buf, count); 116 EP_PendingRequests->erase(it); 117 it = EP_PendingRequests->begin(); 118 printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 119 continue; 120 } 121 else it++; 118 if(probed) 119 { 120 int count; 121 MPI_Get_count(&pending_status, (*it)->ep_datatype, &count); 122 MPI_Imrecv((*it)->buf, count, (*it)->ep_datatype, &pending_message, *it); 123 //printf("proc %d : pending request is imrecving src = %d, tag = %d, count = %d\n", dest_rank, (*it)->ep_src, (*it)->ep_tag, count); 124 EP_PendingRequests->erase(it); 125 it = EP_PendingRequests->begin(); 126 //printf("proc %d : pending request processed, size = %d\n", dest_rank, EP_PendingRequests->size()); 127 continue; 122 128 } 129 130 it++; 123 131 } 124 /*125 int flag = false;126 MPI_Message message;127 MPI_Status status;128 132 129 MPI_Improbe(src, tag, comm, &flag, &message, &status);130 131 if(flag)132 {133 MPI_Imrecv(buf, count, datatype, &message, request);134 printf("proc %d : found message in local queue, src = %d, tag = %d\n", dest_rank, src, tag);135 }136 */137 138 133 return 0; 139 134 } -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_type.hpp
r1185 r1187 453 453 MPI_Request(void* request): mpi_request(request) {} 454 454 #endif 455 456 std::list< MPI_Request* > * pending_ptr; 455 457 }; 456 458 … … 490 492 // <MPI_Fint,thread_num> EP_Comm 491 493 492 static std::list< MPI_Request* > * EP_PendingRequests = new std::list< MPI_Request* >;494 static std::list< MPI_Request* > * EP_PendingRequests = 0; 493 495 #pragma omp threadprivate(EP_PendingRequests) 494 496 } -
XIOS/dev/branch_yushan_merged/extern/src_ep_dev/ep_wait.cpp
r1185 r1187 20 20 { 21 21 22 if(request->type == 1) 22 if(request->type == 1) //=>isend 23 23 { 24 24 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); … … 44 44 } 45 45 46 if(request->type == 2) 47 { 48 int flag = false; 49 MPI_Message message; 50 51 while(!flag) 52 { 53 Message_Check(request->comm); 54 #pragma omp flush 55 MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &flag, &message, status); 56 } 57 58 int count; 59 request->type = 3; 60 MPI_Get_count(status, request->ep_datatype, &count); 61 MPI_Mrecv(request->buf, count, request->ep_datatype, &message, status); 62 status->ep_datatype = request->ep_datatype; 63 64 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 65 66 return 0; 67 } 68 69 if(request->type == 3) 46 if(request->type == 3) //=>imrecv 70 47 { 71 48 ::MPI_Request mpi_request = static_cast< ::MPI_Request >(request->mpi_request); … … 90 67 91 68 //check_sum_recv(request->buf, count, request->ep_datatype, request->ep_src, request->ep_tag, request->comm, 2); 69 return 0; 70 71 } 72 73 if(request->type == 2) //=>irecv not probed 74 { 75 76 while(true) 77 { 78 Message_Check(request->comm); 79 // parcours pending list 80 for(std::list<MPI_Request* >::iterator it = (request->pending_ptr)->begin(); it!=(request->pending_ptr)->end(); ) 81 { 82 if(*it == request) 83 { 84 int probed = false; 85 MPI_Message message; 86 87 MPI_Improbe(request->ep_src, request->ep_tag, request->comm, &probed, &message, status); 88 89 if(probed) 90 { 91 int recv_count; 92 MPI_Get_count(status, request->ep_datatype, &recv_count); 93 MPI_Mrecv(request->buf, recv_count, request->ep_datatype, &message, status); 94 (request->pending_ptr)->erase(it); 95 //printf("wait : pending request processed, size = %d\n", (request->pending_ptr)->size()); 96 request->type = 3; 97 98 return 0; 99 } 100 101 it++; 102 } 103 else 104 { 105 int probed = false; 106 MPI_Message message; 107 MPI_Status status; 108 109 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 110 111 if(probed) 112 { 113 int recv_count; 114 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 115 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 116 117 (request->pending_ptr)->erase(it); 118 119 it = (request->pending_ptr)->begin(); 120 //printf("wait : pending request processed, size = %d\n", (request->pending_ptr)->size()); 121 } 122 else it++; 123 } 124 } 125 126 } 127 128 92 129 } 93 130 … … 111 148 112 149 for(int i=0; i<count; i++) 150 printf("pending add = %p\n", array_of_requests[i].pending_ptr); 151 152 //if(EP_PendingRequests == 0) EP_PendingRequests = new std::list< MPI_Request* >; 153 //printf("pending size = %d, add = %p\n", EP_PendingRequests->size(), EP_PendingRequests); 154 155 for(int i=0; i<count; i++) 113 156 { 114 157 finished_index[i] = false; … … 117 160 while(finished < count) 118 161 { 119 162 120 163 for(int i=0; i<count; i++) 121 164 { 122 165 if(finished_index[i] == false) // this request has not been tested. 123 166 { 124 if(array_of_requests[i].type != 2) // isend or imrecv167 if(array_of_requests[i].type == 1 || array_of_requests[i].type == 3) // isend or imrecv 125 168 { 126 169 //MPI_Wait(&array_of_requests[i], &array_of_statuses[i]); … … 133 176 else // irecv 134 177 { 178 179 Message_Check(array_of_requests[i].comm); 135 180 // parcours pending list 136 // find request in waitall 137 Message_Check(array_of_requests[i].comm); 138 // improbe + mrecv 139 // erase element in pending list 140 // finished++; 141 // finished_index[i] = true; 142 143 144 int flag = false; 145 MPI_Message message; 146 147 MPI_Improbe(array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, &flag, &message, &array_of_statuses[i]); 148 149 if(flag) 181 for(std::list<MPI_Request* >::iterator it = (array_of_requests[i].pending_ptr)->begin(); it!=(array_of_requests[i].pending_ptr)->end(); ) 150 182 { 151 int recv_count; 152 MPI_Get_count(&array_of_statuses[i], array_of_requests[i].ep_datatype, &recv_count); 153 MPI_Mrecv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, &message, &array_of_statuses[i]); 154 //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 155 156 finished++; 157 finished_index[i] = true; 183 bool matched = false; 184 for(int j=0; j<count; j++) 185 { 186 if(*it == &array_of_requests[j]) 187 { 188 int probed = false; 189 MPI_Message message; 190 191 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &array_of_statuses[j]); 192 193 if(probed) 194 { 195 int recv_count; 196 MPI_Get_count(&array_of_statuses[j], array_of_requests[j].ep_datatype, &recv_count); 197 MPI_Mrecv(array_of_requests[j].buf, recv_count, array_of_requests[j].ep_datatype, &message, &array_of_statuses[j]); 198 //check_sum_recv(array_of_requests[i].buf, recv_count, array_of_requests[i].ep_datatype, array_of_requests[i].ep_src, array_of_requests[i].ep_tag, array_of_requests[i].comm, 2); 199 (array_of_requests[i].pending_ptr)->erase(it); 200 array_of_requests[j].type = 3; 201 finished++; 202 finished_index[j] = true; 203 matched = true; 204 it = (array_of_requests[i].pending_ptr)->begin(); 205 j=count; 206 //printf("waitall : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 207 //printf("matched = %d, j=%d, src = %d, tag = %d, probed = %d\n", matched, j, (*it)->ep_src, (*it)->ep_tag, probed); 208 } 209 } 210 211 } 212 213 if(!matched) 214 { 215 int probed = false; 216 MPI_Message message; 217 MPI_Status status; 218 219 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 220 221 if(probed) 222 { 223 int recv_count; 224 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 225 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 226 227 (array_of_requests[i].pending_ptr)->erase(it); 228 229 it = (array_of_requests[i].pending_ptr)->begin(); 230 //printf("waitall : pending request processed, size = %d\n", (*(array_of_requests[i].pending_ptr))->size()); 231 } 232 else it++; 233 } 158 234 } 235 159 236 } 160 237 }
Note: See TracChangeset
for help on using the changeset viewer.