Changeset 1520 for XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp
- Timestamp:
- 06/04/18 19:25:08 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/extern/src_ep_dev/ep_message.cpp
r1374 r1520 18 18 namespace ep_lib 19 19 { 20 20 int Request_Check() 21 { 22 if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >; 23 24 if(EP_PendingRequests->size() == 0) return 0; 25 26 MPI_Status status; 27 MPI_Message *message; 28 int probed = false; 29 int recv_count = 0; 30 std::list<MPI_Request* >::iterator it; 31 32 33 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 34 { 35 Message_Check(((*(*it))->comm)); 36 } 37 38 39 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 40 { 41 if((*(*it))->state == 0) 42 { 43 #pragma omp critical (_query0) 44 { 45 MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status); 46 if(probed) 47 { 48 message = new MPI_Message; 49 *message = new ep_message; 50 51 memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message"); 52 memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message"); 53 54 55 MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status); 56 57 } 58 } 59 60 61 if(probed) 62 { 63 MPI_Get_count(&status, (*(*it))->ep_datatype, &recv_count); 64 65 MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it); 66 (*(*it))->type = 3; 67 (*(*it))->state = 1; 68 69 memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status"); 70 delete status.mpi_status; 71 72 memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message"); 73 memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message"); 74 75 delete *message; 76 delete message; 77 78 it++; 79 continue; 80 } 81 } 82 83 if((*(*it))->state == 2) 84 { 85 int ep_rank = ((*(*it))->comm)->ep_comm_ptr->size_rank_info[0].first; 86 memcheck("delete "<< (*(*it)) <<" : in ep_lib::Request_Check, delete (*(*it))"); 87 88 89 int world_rank; 90 MPI_Comm_rank(MPI_COMM_WORLD, &world_rank); 91 if(world_rank==2) 92 { 93 printf("ep %d erased one pending request %p\n", world_rank,*(*it)); 94 } 95 96 EP_PendingRequests->erase(it); 97 98 memcheck("EP_PendingRequests["<<ep_rank<<"]->size() = " << EP_PendingRequests->size()); 99 it = EP_PendingRequests->begin(); 100 continue; 101 } 102 else it++; 103 } 104 } 105 106 107 21 108 int Message_Check(MPI_Comm comm) 22 109 { 23 if(!comm.is_ep) return 0; 24 25 if(comm.is_intercomm) 26 { 27 return Message_Check_intercomm(comm); 28 } 110 if(!comm->is_ep) return MPI_SUCCESS; 111 112 if(comm->is_intercomm) 113 { 114 Message_Check_intercomm(comm); 115 } 116 117 return Message_Check_intracomm(comm); 118 119 } 120 121 122 int Message_Check_intracomm(MPI_Comm comm) 123 { 124 125 int flag = true; 126 ::MPI_Status status; 127 ::MPI_Message message; 128 129 while(flag) // loop until the end of global queue 130 { 131 Debug("Message probing for intracomm\n"); 132 133 #pragma omp critical (_mpi_call) 134 { 135 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status); 136 if(flag) 137 { 138 Debug("find message in mpi comm \n"); 139 ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status); 140 } 141 } 142 143 144 if(flag) 145 { 146 MPI_Message msg = new ep_message; 147 msg->mpi_message = new ::MPI_Message(message); 148 149 memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message"); 150 memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message"); 151 152 153 msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 154 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 155 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 156 int src_mpi = status.MPI_SOURCE; 157 158 msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); 159 160 #ifdef _showinfo 161 printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag); 162 #endif 163 164 msg->mpi_status = new ::MPI_Status(status); 165 memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status"); 166 167 #pragma omp critical (_query) 168 { 169 #pragma omp flush 170 comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 171 int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first; 172 memcheck("message_queue["<<dest_mpi<<","<<dest_loc<<"]->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size()); 173 #pragma omp flush 174 } 175 } 176 } 177 178 return MPI_SUCCESS; 179 } 180 181 182 183 184 185 186 187 int Message_Check_intercomm(MPI_Comm comm) 188 { 189 if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0; 190 191 Debug("Message probing for intercomm\n"); 29 192 30 193 int flag = true; 31 194 ::MPI_Message message; 32 195 ::MPI_Status status; 33 int mpi_source;34 35 while(flag) // loop until the end of global queue36 {37 Debug("Message probing for intracomm\n");38 39 #pragma omp critical (_mpi_call)40 {41 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);42 if(flag)43 {44 Debug("find message in mpi comm \n");45 mpi_source = status.MPI_SOURCE;46 int tag = status.MPI_TAG;47 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);48 49 }50 }51 52 53 if(flag)54 {55 56 MPI_Message *msg_block = new MPI_Message;57 msg_block->mpi_message = new ::MPI_Message;58 *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;59 msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();60 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong();61 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong();62 int src_mpi = status.MPI_SOURCE;63 64 msg_block->ep_src = get_ep_rank(comm, src_loc, src_mpi);65 msg_block->mpi_status = new ::MPI_Status(status);66 67 MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;68 MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];69 70 71 #pragma omp critical (_query)72 {73 #pragma omp flush74 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);75 #pragma omp flush76 }77 78 delete msg_block;79 }80 81 }82 83 return MPI_SUCCESS;84 }85 86 87 88 int Message_Check_intercomm(MPI_Comm comm)89 {90 if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;91 92 Debug("Message probing for intercomm\n");93 94 int flag = true;95 ::MPI_Message message;96 ::MPI_Status status;97 int mpi_source;98 196 int current_ep_rank; 99 197 MPI_Comm_rank(comm, ¤t_ep_rank); 100 198 101 while(flag) // loop until the end of global queue "comm .ep_comm_ptr->intercomm->mpi_inter_comm"199 while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm" 102 200 { 103 201 Debug("Message probing for intracomm\n"); … … 105 203 #pragma omp critical (_mpi_call) 106 204 { 107 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm .ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);205 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status); 108 206 if(flag) 109 207 { 110 208 Debug("find message in mpi comm \n"); 111 mpi_source = status.MPI_SOURCE; 112 int tag = status.MPI_TAG; 113 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); 114 209 ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); 115 210 } 116 211 } … … 120 215 { 121 216 122 MPI_Message *msg_block = new MPI_Message; 123 msg_block->mpi_message = new ::MPI_Message; 124 *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 125 msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 126 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 127 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 128 int src_mpi = status.MPI_SOURCE; 129 int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first; 217 MPI_Message msg = new ep_message; 218 msg->mpi_message = new ::MPI_Message(message); 219 220 memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message"); 221 memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message"); 222 223 224 msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 225 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 226 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 227 int src_mpi = status.MPI_SOURCE; 130 228 131 msg _block->ep_src = get_ep_rank_intercomm(comm, src_loc, src_mpi);132 msg_block->mpi_status = new ::MPI_Status(status); 133 134 135 MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 136 MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];137 229 msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); 230 #ifdef _showinfo 231 printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag); 232 #endif 233 234 msg->mpi_status = new ::MPI_Status(status); 235 memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status"); 138 236 139 237 #pragma omp critical (_query) 140 238 { 141 239 #pragma omp flush 142 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 143 #pragma omp flush 144 } 145 146 delete msg_block; 147 148 } 149 150 } 151 152 flag = true; 153 while(flag) // loop until the end of global queue "comm.mpi_comm" 154 { 155 Debug("Message probing for intracomm\n"); 156 157 #pragma omp critical (_mpi_call) 158 { 159 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status); 160 if(flag) 161 { 162 Debug("find message in mpi comm \n"); 163 mpi_source = status.MPI_SOURCE; 164 int tag = status.MPI_TAG; 165 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status); 166 167 } 168 } 169 170 171 if(flag) 172 { 173 174 MPI_Message *msg_block = new MPI_Message; 175 msg_block->mpi_message = new ::MPI_Message; 176 *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 177 msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 178 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 179 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 180 int src_mpi = status.MPI_SOURCE; 181 182 msg_block->ep_src = get_ep_rank_intercomm(comm, src_loc, src_mpi); 183 msg_block->mpi_status = new ::MPI_Status(status); 184 185 186 MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 187 MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc]; 188 189 190 #pragma omp critical (_query) 191 { 192 #pragma omp flush 193 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 194 #pragma omp flush 195 } 196 197 delete msg_block; 198 199 } 200 201 } 240 comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 241 memcheck("comm->ep_comm_ptr->comm_list["<<dest_loc<<"]->ep_comm_ptr->message_queue->size = "<<comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size()); 242 #pragma omp flush 243 } 244 } 245 } 246 247 Message_Check_intracomm(comm); 202 248 203 249 return MPI_SUCCESS; 204 250 } 205 251 206 int Request_Check() 207 { 208 MPI_Status status; 209 MPI_Message message; 210 int probed = false; 211 int recv_count = 0; 212 std::list<MPI_Request* >::iterator it; 213 214 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 215 { 216 Message_Check((*it)->comm); 217 } 218 219 220 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 221 { 222 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 223 if(probed) 224 { 225 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 226 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 227 (*it)->type = 3; 228 EP_PendingRequests->erase(it); 229 it = EP_PendingRequests->begin(); 230 continue; 231 } 232 it++; 233 } 234 } 252 235 253 236 254 }
Note: See TracChangeset
for help on using the changeset viewer.