/*! \file ep_message.cpp \since 2 may 2016 \brief Definitions of MPI endpoint function: Message_Check */ #include "ep_lib.hpp" #include #include "ep_declaration.hpp" #include "ep_mpi.hpp" using namespace std; extern std::list< ep_lib::MPI_Request* > * EP_PendingRequests; #pragma omp threadprivate(EP_PendingRequests) namespace ep_lib { int Request_Check() { MPI_Status status; MPI_Message *message; int probed = false; int recv_count = 0; std::list::iterator it; if(EP_PendingRequests == 0 ) return 0; for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); it++) { Message_Check(((*(*it))->comm)); } for(it = EP_PendingRequests->begin(); it!=EP_PendingRequests->end(); ) { if((*(*it))->state == 0) { message = new MPI_Message; *message = new ep_message; printf("new %p : in ep_lib::Request_Check, message = new MPI_Message\n", message); printf("new %p : in ep_lib::Request_Check, *message = new ep_message\n", *message); MPI_Improbe((*(*it))->ep_src, (*(*it))->ep_tag, ((*(*it))->comm), &probed, message, &status); printf("in Request_Check, after improbe, mpi_status = %p\n", to_mpi_status_ptr(status)); if(probed) { ::MPI_Get_count(to_mpi_status_ptr(status), to_mpi_type((*(*it))->ep_datatype), &recv_count); printf("in Request_Check, imrecv, buf = %p, recv_count = %d, status = %p\n", (*(*it))->buf, recv_count, to_mpi_status_ptr(status)); MPI_Imrecv((*(*it))->buf, recv_count, (*(*it))->ep_datatype, message, *it); (*(*it))->type = 3; (*(*it))->state = 1; printf("delete %p : in ep_lib::Request_Check, delete *message\n", *message); printf("delete %p : in ep_lib::Request_Check, delete message\n", message); delete *message; delete message; it++; continue; } } if((*(*it))->state == 2) { printf("delete %p : in ep_lib::Request_Check, delete (*(*it))\n", (*(*it))); delete (*(*it)); EP_PendingRequests->erase(it); printf("EP_PendingRequests->size() = %lu\n", EP_PendingRequests->size()); it = EP_PendingRequests->begin(); continue; } else it++; } } int Message_Check(MPI_Comm comm) { if(!comm->is_ep) return MPI_SUCCESS; if(comm->is_intercomm) { return Message_Check_intercomm(comm); } return Message_Check_intracomm(comm); } int Message_Check_intracomm(MPI_Comm comm) { int flag = true; ::MPI_Status status; ::MPI_Message message; while(flag) // loop until the end of global queue { Debug("Message probing for intracomm\n"); #pragma omp critical (_mpi_call) { ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->mpi_comm), &flag, &status); if(flag) { Debug("find message in mpi comm \n"); ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->mpi_comm), &message, &status); } } if(flag) { MPI_Message msg = new ep_message; msg->mpi_message = new ::MPI_Message(message); printf("new %p : in ep_lib::Message_Check, msg = new ep_message\n", msg); printf("new %p : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message\n", msg->mpi_message); msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); int src_mpi = status.MPI_SOURCE; msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); msg->mpi_status = new ::MPI_Status(status); printf("new %p : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status\n", msg->mpi_status); #pragma omp critical (_query) { #pragma omp flush comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); int test_count; ::MPI_Get_count(static_cast< ::MPI_Status* >(msg->mpi_status), 1275070475, &test_count); printf("status1 = %p, test_count2 = %d\n", static_cast< ::MPI_Status* >(msg->mpi_status), test_count); ::MPI_Get_count(static_cast< ::MPI_Status* >(comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->back()->mpi_status), 1275070475, &test_count); printf("status2 = %p, test_count2 = %d\n", static_cast< ::MPI_Status* >(comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->back()->mpi_status), test_count); #pragma omp flush } } } return MPI_SUCCESS; } int Message_Check_intercomm(MPI_Comm comm) { if(!comm->ep_comm_ptr->intercomm->mpi_inter_comm) return 0; Debug("Message probing for intercomm\n"); int flag = true; ::MPI_Message message; ::MPI_Status status; int current_ep_rank; MPI_Comm_rank(comm, ¤t_ep_rank); while(flag) // loop until the end of global queue "comm->ep_comm_ptr->intercomm->mpi_inter_comm" { Debug("Message probing for intracomm\n"); #pragma omp critical (_mpi_call) { ::MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &flag, &status); if(flag) { Debug("find message in mpi comm \n"); ::MPI_Mprobe(status.MPI_SOURCE, status.MPI_TAG, to_mpi_comm(comm->ep_comm_ptr->intercomm->mpi_inter_comm), &message, &status); } } if(flag) { MPI_Message msg = new ep_message; msg->mpi_message = new ::MPI_Message(message); printf("new %p : in ep_lib::Message_Check, msg = new ep_message\n", msg); printf("new %p : in ep_lib::Message_Check, msg->mpi_message = new ::MPI_Message\n", msg->mpi_message); msg->ep_tag = bitset<15>(status.MPI_TAG >> 16).to_ulong(); int src_loc = bitset<8> (status.MPI_TAG >> 8) .to_ulong(); int dest_loc = bitset<8> (status.MPI_TAG) .to_ulong(); int src_mpi = status.MPI_SOURCE; int current_inter = comm->ep_comm_ptr->intercomm->local_rank_map->at(current_ep_rank).first; msg->ep_src = get_ep_rank(comm, src_loc, src_mpi); msg->mpi_status = new ::MPI_Status(status); printf("new %p : in ep_lib::Message_Check, msg->mpi_status = new ::MPI_Status\n", msg->mpi_status); #pragma omp critical (_query) { #pragma omp flush comm->ep_comm_ptr->comm_list[dest_loc]->ep_comm_ptr->message_queue->push_back(msg); #pragma omp flush } } } Message_Check_intracomm(comm); return MPI_SUCCESS; } }