/*! \file ep_gather.cpp \since 2 may 2016 \brief Definitions of MPI collective function: MPI_Gather, MPI_Allgather */ #include "ep_lib.hpp" #include #include "ep_declaration.hpp" #include #include "ep_mpi.hpp" using namespace std; namespace ep_lib { int MPI_Allgather_local(const void *sendbuf, int count, MPI_Datatype datatype, void *recvbuf, MPI_Comm comm) { assert(valid_type(datatype)); ::MPI_Aint datasize, lb; ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first; int num_ep = comm->ep_comm_ptr->size_rank_info[1].second; #pragma omp critical (write_buffer) comm->my_buffer->void_buffer[ep_rank_loc] = const_cast< void* >(sendbuf); MPI_Barrier_local(comm); #pragma omp critical (read_buffer) { for(int i=0; imy_buffer->void_buffer[i], datasize * count); } MPI_Barrier_local(comm); } int MPI_Allgather(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { if(!comm->is_ep) return ::MPI_Allgather(const_cast(sendbuf), sendcount, to_mpi_type(sendtype), recvbuf, recvcount, to_mpi_type(recvtype), to_mpi_comm(comm->mpi_comm)); if(comm->is_intercomm) return MPI_Allgather_intercomm(sendbuf, sendcount, sendtype, recvbuf, recvcount, recvtype, comm); assert(sendcount == recvcount); assert(valid_type(sendtype) && valid_type(recvtype)); MPI_Datatype datatype = sendtype; int count = sendcount; ::MPI_Aint datasize, lb; ::MPI_Type_get_extent(to_mpi_type(datatype), &lb, &datasize); int ep_rank = comm->ep_comm_ptr->size_rank_info[0].first; int ep_rank_loc = comm->ep_comm_ptr->size_rank_info[1].first; int mpi_rank = comm->ep_comm_ptr->size_rank_info[2].first; int ep_size = comm->ep_comm_ptr->size_rank_info[0].second; int num_ep = comm->ep_comm_ptr->size_rank_info[1].second; int mpi_size = comm->ep_comm_ptr->size_rank_info[2].second; bool is_master = ep_rank_loc==0; void* local_recvbuf; void* tmp_recvbuf; if(is_master) { local_recvbuf = new void*[datasize * num_ep * count]; tmp_recvbuf = new void*[datasize * count * ep_size]; } MPI_Gather_local(sendbuf, count, datatype, local_recvbuf, 0, comm); int* mpi_recvcounts; int *mpi_displs; if(is_master) { mpi_recvcounts = new int[mpi_size]; mpi_displs = new int[mpi_size]; int local_sendcount = num_ep * count; ::MPI_Allgather(&local_sendcount, 1, to_mpi_type(MPI_INT), mpi_recvcounts, 1, to_mpi_type(MPI_INT), to_mpi_comm(comm->mpi_comm)); mpi_displs[0] = 0; for(int i=1; impi_comm)); // reorder int offset; for(int i=0; iep_rank_map->at(i).second] + comm->ep_rank_map->at(i).first * sendcount; memcpy(recvbuf + i*sendcount*datasize, tmp_recvbuf+offset*datasize, sendcount*datasize); } delete[] mpi_recvcounts; delete[] mpi_displs; } MPI_Bcast_local(recvbuf, count*ep_size, datatype, 0, comm); if(is_master) { delete[] local_recvbuf; delete[] tmp_recvbuf; } } int MPI_Allgather_intercomm(const void *sendbuf, int sendcount, MPI_Datatype sendtype, void *recvbuf, int recvcount, MPI_Datatype recvtype, MPI_Comm comm) { printf("MPI_Allgather_intercomm not yet implemented\n"); MPI_Abort(comm, 0); } }