/*! \file ep_message.cpp \since 2 may 2016 \brief Definitions of MPI endpoint function: Message_Check */ #include "ep_lib.hpp" #include #include "ep_declaration.hpp" #include "ep_mpi.hpp" using namespace std; extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests; #pragma omp threadprivate(EP_PendingRequests) namespace ep_lib { int Request_Check() { if(EP_PendingRequests == 0 ) EP_PendingRequests = new std::list< MPI_Request* >; if(EP_PendingRequests->size() == 0) return 0; show_EP_PendingRequests(EP_PendingRequests); MPI_Status status; MPI_Message *message; int probed = false; int recv_count = 0; std::list::iterator it; // show_EP_PendingRequests(EP_PendingRequests); for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) { if(*(*it) == 0) { EP_PendingRequests->erase(it); memcheck("EP_PendingRequests["<size() = " << EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } if((*(*it))->state == 2) { EP_PendingRequests->erase(it); memcheck("EP_PendingRequests["<size() = " << EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } Message_Check(((*(*it))->comm)); } for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) { if(*(*it) == 0) { EP_PendingRequests->erase(it); memcheck("EP_PendingRequests["<size() = " << EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } if((*(*it))->state == 2) { EP_PendingRequests->erase(it); memcheck("EP_PendingRequests["<size() = " << EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } if((*(*it))->state == 0) { #pragma omp critical (_query0) { MPI_Iprobe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, &status); if(probed) { message = new MPI_Message; *message = new ep_message; memcheck("new "<< message <<" : in ep_lib::Request_Check, message = new MPI_Message"); memcheck("new "<< *message <<" : in ep_lib::Request_Check, *message = new ep_message"); MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->comm, &probed, message, &status); } } if(probed) { MPI_Get_count(&status, (*(*it))->ep_datatype, &recv_count); MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it); (*(*it))->type = 3; (*(*it))->state = 1; memcheck("delete "<< status.mpi_status <<" : in ep_lib::Request_Check, delete status.mpi_status"); delete status.mpi_status; memcheck("delete "<< *message <<" : in ep_lib::Request_Check, delete *message"); memcheck("delete "<< message <<" : in ep_lib::Request_Check, delete message"); delete *message; delete message; it++; continue; } } it++; } show_EP_PendingRequests(EP_PendingRequests); } int Message_Check(MPI_Comm comm) { if(!comm->is_ep) return MPI_SUCCESS; if(comm->is_intercomm) { Message_Check_intercomm(comm); } return Message_Check_intracomm(comm); } int Message_Check_intracomm(MPI_Comm comm) { int flag = true; ::MPI_Status status; ::MPI_Message message; while(flag) // loop until the end of global queue { Debug("Message probing for intracomm\n"); #pragma omp critical (_mpi_call) { ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status); if(flag) { Debug("find message in mpi comm \n"); ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status); } } if(flag) { MPI_Message msg = new ep_message; msg->mpi_message = new ::MPI_Message(message); memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message"); memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message"); msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); int src_mpi = status.MPI_SOURCE; msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); #ifdef _showinfo printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag); #endif msg->mpi_status = new ::MPI_Status(status); memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status"); #pragma omp critical (_query) { #pragma omp flush comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); int dest_mpi = comm->ep_comm_ptr->size_rank_info[2].first; memcheck("message_queue["<size = "<ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size()); #pragma omp flush } } } return MPI_SUCCESS; } int Message_Check_intercomm(MPI_Comm comm) { if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0; Debug("Message probing for intercomm\n"); int flag = true; ::MPI_Message message; ::MPI_Status status; int current_ep_rank; MPI_Comm_rank(comm, ¤t_ep_rank); while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm" { Debug("Message probing for intracomm\n"); #pragma omp critical (_mpi_call) { ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status); if(flag) { Debug("find message in mpi comm \n"); ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); } } if(flag) { MPI_Message msg = new ep_message; msg->mpi_message = new ::MPI_Message(message); memcheck("new "<< msg <<" : in ep_lib::Message_Check, msg = new ep_message"); memcheck("new "<< msg->mpi_message <<" : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message"); msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); int src_mpi = status.MPI_SOURCE; msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); #ifdef _showinfo printf("status.MPI_TAG = %d, src_loc = %d, dest_loc = %d, ep_tag = %d\n", status.MPI_TAG, src_loc, dest_loc, msg->ep_tag); #endif msg->mpi_status = new ::MPI_Status(status); memcheck("new "<< msg->mpi_status <<" : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status"); #pragma omp critical (_query) { #pragma omp flush comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); memcheck("comm->ep_comm_ptr->comm_list["<ep_comm_ptr->message_queue->size = "<ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->size()); #pragma omp flush } } } Message_Check_intracomm(comm); return MPI_SUCCESS; } void show_EP_PendingRequests(std::list< ep_lib::MPI_Request* > * EP_PendingRequest) { #ifdef _showinfo std::list::iterator it; int i=0; for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) { if(*(*it) == 0) { EP_PendingRequests->erase(it); memcheck("EP_PendingRequests["<size() = " << EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } if((*(*it))->state == 2) { EP_PendingRequests->erase(it); memcheck("EP_PendingRequests["<size() = " << EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } if((*(*it))->ep_src>6) printf("EP_PendingRequests[%d] : ep_src = %d, ep_tag = %d, type = %d, state = %d, comm = %d\n", i, (*(*it))->ep_src, (*(*it))->ep_tag, (*(*it))->type, (*(*it))->state, to_mpi_comm(((*(*it))->comm)->mpi_comm)); i++; } #endif } }