Changeset 1500 for XIOS/dev/branch_openmp/extern/ep_dev/ep_message.cpp
- Timestamp:
- 05/28/18 09:54:32 (6 years ago)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/branch_openmp/extern/ep_dev/ep_message.cpp
r1381 r1500 18 18 namespace ep_lib 19 19 { 20 20 int Request_Check() 21 { 22 MPI_Status status; 23 MPI_Message *message; 24 int probed = false; 25 int recv_count = 0; 26 std::list<MPI_Request* >::iterator it; 27 28 if(EP_PendingRequests == 0 ) return 0; 29 30 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 31 { 32 Message_Check(((*(*it))->comm)); 33 } 34 35 36 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 37 { 38 if((*(*it))->state == 0) 39 { 40 41 message = new MPI_Message; 42 *message = new ep_message; 43 44 printf("new %p : in ep_lib::Request_Check, message = new MPI_Message\n", message); 45 printf("new %p : in ep_lib::Request_Check, *message = new ep_message\n", *message); 46 47 48 MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, message, &status); 49 printf("in Request_Check, after improbe, mpi_status = %p\n", to_mpi_status_ptr(status)); 50 51 52 if(probed) 53 { 54 ::MPI_Get_count(to_mpi_status_ptr(status), to_mpi_type((*(*it))->ep_datatype), &recv_count); 55 56 printf("in Request_Check, imrecv, buf = %p, recv_count = %d, status = %p\n", (*(*it))->buf, recv_count, to_mpi_status_ptr(status)); 57 MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it); 58 (*(*it))->type = 3; 59 (*(*it))->state = 1; 60 61 62 printf("delete %p : in ep_lib::Request_Check, delete *message\n", *message); 63 printf("delete %p : in ep_lib::Request_Check, delete message\n", message); 64 65 delete *message; 66 delete message; 67 68 it++; 69 continue; 70 } 71 } 72 73 if((*(*it))->state == 2) 74 { 75 printf("delete %p : in ep_lib::Request_Check, delete (*(*it))\n", (*(*it))); 76 delete (*(*it)); 77 78 EP_PendingRequests->erase(it); 79 printf("EP_PendingRequests->size() = %lu\n", EP_PendingRequests->size()); 80 it = EP_PendingRequests->begin(); 81 continue; 82 } 83 else it++; 84 } 85 } 86 87 88 21 89 int Message_Check(MPI_Comm comm) 22 90 { 23 if(!comm .is_ep) return 0;24 25 if(comm .is_intercomm)91 if(!comm->is_ep) return MPI_SUCCESS; 92 93 if(comm->is_intercomm) 26 94 { 27 95 return Message_Check_intercomm(comm); 28 96 } 97 98 return Message_Check_intracomm(comm); 99 100 } 101 102 103 int Message_Check_intracomm(MPI_Comm comm) 104 { 105 106 int flag = true; 107 ::MPI_Status status; 108 ::MPI_Message message; 109 110 while(flag) // loop until the end of global queue 111 { 112 Debug("Message probing for intracomm\n"); 113 114 #pragma omp critical (_mpi_call) 115 { 116 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status); 117 if(flag) 118 { 119 Debug("find message in mpi comm \n"); 120 ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status); 121 } 122 } 123 124 125 if(flag) 126 { 127 MPI_Message msg = new ep_message; 128 msg->mpi_message = new ::MPI_Message(message); 129 130 printf("new %p : in ep_lib::Message_Check, msg = new ep_message\n", msg); 131 printf("new %p : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message\n", msg->mpi_message); 132 133 134 msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 135 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 136 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 137 int src_mpi = status.MPI_SOURCE; 138 139 msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); 140 msg->mpi_status = new ::MPI_Status(status); 141 printf("new %p : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status\n", msg->mpi_status); 142 143 #pragma omp critical (_query) 144 { 145 #pragma omp flush 146 comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 147 int test_count; 148 ::MPI_Get_count(static_cast< ::MPI_Status* >(msg->mpi_status), 1275070475, &test_count); 149 printf("status1 = %p, test_count2 = %d\n", static_cast< ::MPI_Status* >(msg->mpi_status), test_count); 150 151 ::MPI_Get_count(static_cast< ::MPI_Status* >(comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->back()->mpi_status), 1275070475, &test_count); 152 printf("status2 = %p, test_count2 = %d\n", static_cast< ::MPI_Status* >(comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->back()->mpi_status), test_count); 153 154 #pragma omp flush 155 } 156 } 157 } 158 159 return MPI_SUCCESS; 160 } 161 162 163 164 165 166 167 168 int Message_Check_intercomm(MPI_Comm comm) 169 { 170 if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0; 171 172 Debug("Message probing for intercomm\n"); 29 173 30 174 int flag = true; 31 175 ::MPI_Message message; 32 176 ::MPI_Status status; 33 int mpi_source;34 35 while(flag) // loop until the end of global queue36 {37 Debug("Message probing for intracomm\n");38 39 #pragma omp critical (_mpi_call)40 {41 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status);42 if(flag)43 {44 Debug("find message in mpi comm \n");45 mpi_source = status.MPI_SOURCE;46 int tag = status.MPI_TAG;47 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status);48 49 }50 }51 52 53 if(flag)54 {55 56 MPI_Message *msg_block = new MPI_Message;57 msg_block->mpi_message = new ::MPI_Message;58 *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message;59 msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong();60 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong();61 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong();62 int src_mpi = status.MPI_SOURCE;63 64 msg_block->ep_src = get_ep_rank(comm, src_loc, src_mpi);65 msg_block->mpi_status = new ::MPI_Status(status);66 67 MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list;68 MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc];69 70 71 #pragma omp critical (_query)72 {73 #pragma omp flush74 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block);75 #pragma omp flush76 }77 78 delete msg_block;79 }80 81 }82 83 return MPI_SUCCESS;84 }85 86 87 88 int Message_Check_intercomm(MPI_Comm comm)89 {90 if(!comm.ep_comm_ptr->intercomm->mpi_inter_comm) return 0;91 92 Debug("Message probing for intercomm\n");93 94 int flag = true;95 ::MPI_Message message;96 ::MPI_Status status;97 int mpi_source;98 177 int current_ep_rank; 99 178 MPI_Comm_rank(comm, ¤t_ep_rank); 100 179 101 while(flag) // loop until the end of global queue "comm .ep_comm_ptr->intercomm->mpi_inter_comm"180 while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm" 102 181 { 103 182 Debug("Message probing for intracomm\n"); … … 105 184 #pragma omp critical (_mpi_call) 106 185 { 107 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm .ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status);186 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status); 108 187 if(flag) 109 188 { 110 189 Debug("find message in mpi comm \n"); 111 mpi_source = status.MPI_SOURCE; 112 int tag = status.MPI_TAG; 113 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); 114 190 ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); 115 191 } 116 192 } … … 120 196 { 121 197 122 MPI_Message *msg_block = new MPI_Message; 123 msg_block->mpi_message = new ::MPI_Message; 124 *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 125 msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 126 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 127 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 128 int src_mpi = status.MPI_SOURCE; 129 int current_inter = comm.ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first; 198 MPI_Message msg = new ep_message; 199 msg->mpi_message = new ::MPI_Message(message); 200 201 printf("new %p : in ep_lib::Message_Check, msg = new ep_message\n", msg); 202 printf("new %p : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message\n", msg->mpi_message); 203 204 205 msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 206 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 207 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 208 int src_mpi = status.MPI_SOURCE; 209 int current_inter = comm->ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first; 130 210 131 msg_block->ep_src = get_ep_rank_intercomm(comm, src_loc, src_mpi); 132 msg_block->mpi_status = new ::MPI_Status(status); 133 134 135 MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 136 MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc]; 137 211 msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); 212 msg->mpi_status = new ::MPI_Status(status); 213 printf("new %p : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status\n", msg->mpi_status); 138 214 139 215 #pragma omp critical (_query) 140 216 { 141 217 #pragma omp flush 142 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 143 #pragma omp flush 144 } 145 146 delete msg_block; 147 148 } 149 150 } 151 152 flag = true; 153 while(flag) // loop until the end of global queue "comm.mpi_comm" 154 { 155 Debug("Message probing for intracomm\n"); 156 157 #pragma omp critical (_mpi_call) 158 { 159 ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm.mpi_comm), &flag, &status); 160 if(flag) 161 { 162 Debug("find message in mpi comm \n"); 163 mpi_source = status.MPI_SOURCE; 164 int tag = status.MPI_TAG; 165 ::MPI_Mprobe(mpi_source, tag, to_mpi_comm(comm.mpi_comm), &message, &status); 166 167 } 168 } 169 170 171 if(flag) 172 { 173 174 MPI_Message *msg_block = new MPI_Message; 175 msg_block->mpi_message = new ::MPI_Message; 176 *(static_cast< ::MPI_Message*>(msg_block->mpi_message)) = message; 177 msg_block->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); 178 int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); 179 int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); 180 int src_mpi = status.MPI_SOURCE; 181 182 msg_block->ep_src = get_ep_rank_intercomm(comm, src_loc, src_mpi); 183 msg_block->mpi_status = new ::MPI_Status(status); 184 185 186 MPI_Comm* ptr_comm_list = comm.ep_comm_ptr->comm_list; 187 MPI_Comm* ptr_comm_target = &ptr_comm_list[dest_loc]; 188 189 190 #pragma omp critical (_query) 191 { 192 #pragma omp flush 193 comm.ep_comm_ptr->comm_list[dest_loc].ep_comm_ptr->message_queue->push_back(*msg_block); 194 #pragma omp flush 195 } 196 197 delete msg_block; 198 199 } 200 201 } 218 comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); 219 #pragma omp flush 220 } 221 } 222 } 223 224 Message_Check_intracomm(comm); 202 225 203 226 return MPI_SUCCESS; 204 227 } 205 228 206 int Request_Check() 207 { 208 MPI_Status status; 209 MPI_Message message; 210 int probed = false; 211 int recv_count = 0; 212 std::list<MPI_Request* >::iterator it; 213 214 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) 215 { 216 Message_Check((*it)->comm); 217 } 218 219 220 for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) 221 { 222 MPI_Improbe((*it)->ep_src, (*it)->ep_tag, (*it)->comm, &probed, &message, &status); 223 if(probed) 224 { 225 MPI_Get_count(&status, (*it)->ep_datatype, &recv_count); 226 MPI_Imrecv((*it)->buf, recv_count, (*it)->ep_datatype, &message, *it); 227 (*it)->type = 3; 228 EP_PendingRequests->erase(it); 229 it = EP_PendingRequests->begin(); 230 continue; 231 } 232 it++; 233 } 234 } 229 235 230 236 231 }
Note: See TracChangeset
for help on using the changeset viewer.