#include "mpi_routing.hpp" #include "mpi.hpp" #include "node.hpp" #include "elt.hpp" #include "timerRemap.hpp" #include #ifdef _usingEP #include "ep_declaration.hpp" #endif namespace sphereRemap { const int verbose = 0; CMPIRouting::CMPIRouting(MPI_Comm comm) : communicator(comm) { MPI_Comm_rank(comm, &mpiRank); MPI_Comm_size(comm, &mpiSize); } /* sparse alltoallv when it is known that only a subset of ranks will communicate, but message lengths are *known* to receiver */ template void alltoalls_known(const vector >& send, vector >& recv, const vector& ranks, MPI_Comm communicator) { vector request(ranks.size() * 2); vector status(ranks.size() * 2); // communicate data int nbRequest = 0; for (int i = 0; i < ranks.size(); i++) if (recv[i].size()) MPI_Irecv(&recv[i][0], recv[i].size()*sizeof(T), MPI_CHAR, ranks[i], 0, communicator, &request[nbRequest++]); for (int i = 0; i < ranks.size(); i++) if (send[i].size()) MPI_Isend((void *) &send[i][0], send[i].size()*sizeof(T), MPI_CHAR, ranks[i], 0, communicator, &request[nbRequest++]); MPI_Waitall(nbRequest, &request[0], &status[0]); } /* sparse alltoallv when it is known that only a subset of ranks will communicate, but message lengths are *unknown* to receiver */ template void alltoalls_unknown(const vector >& send, vector >& recv, const vector& ranks, MPI_Comm communicator) { vector request(ranks.size() * 2); vector status(ranks.size() * 2); // communicate sizes int nbRequest = 0; vector sendSizes(ranks.size()); vector recvSizes(ranks.size()); for (int i = 0; i < ranks.size(); i++) sendSizes[i] = send[i].size(); for (int i = 0; i < ranks.size(); i++) MPI_Irecv(&recvSizes[i], 1, MPI_INT, ranks[i], 0, communicator, &request[nbRequest++]); for (int i = 0; i < ranks.size(); i++) MPI_Isend(&sendSizes[i], 1, MPI_INT, ranks[i], 0, communicator, &request[nbRequest++]); MPI_Waitall(nbRequest, &request[0], &status[0]); // allocate for (int i = 0; i < ranks.size(); i++) if (recvSizes[i]) recv[i].resize(recvSizes[i]); // communicate data alltoalls_known(send, recv, ranks, communicator); } void setElementsSendCnt(int route, vector& sendCnt) { sendCnt[route]++; } void setElementsSendCnt(const vector& route, vector& sendCnt) { for (int i = 0; i < route.size(); i++) sendCnt[route[i]]++; } void setTargetElementIndex(int route, vector& indices, const int *rankToIndex) { int index = rankToIndex[route]; indices.push_back(index); } void setTargetElementIndex(const vector& route, vector& indices, const int *rankToIndex) { for (int i = 0; i < route.size(); i++) { int index = rankToIndex[route[i]]; indices.push_back(index); } } template void CMPIRouting::init(const vector& route, CMPICascade *cascade) { vector nbElementSend(mpiSize); int *toSend = new int[mpiSize]; int *recvCount = new int[mpiSize]; int *targetRankToIndex; for (int i = 0; i < route.size(); i++) setElementsSendCnt(route[i], nbElementSend); nbTarget = 0; vector destRanks; for (int i = 0; i < mpiSize; i++) { if (nbElementSend[i] == 0) toSend[i] = 0; else { destRanks.push_back(i); toSend[i] = 1; nbTarget++; } recvCount[i] = 1; } //int recvCntDeb = (stree) ? stree->scatter_reduce(destRanks) : -1; MPI_Barrier(communicator); CTimer::get("CMPIRouting::init(reduce_scatter)").reset(); CTimer::get("CMPIRouting::init(reduce_scatter)").resume(); MPI_Reduce_scatter(toSend, &nbSource, recvCount, MPI_INT, MPI_SUM, communicator); CTimer::get("CMPIRouting::init(reduce_scatter)").suspend(); CTimer::get("CMPIRouting::init(reduce_scatter)").print(); MPI_Info info_null; MPI_Alloc_mem(nbTarget *sizeof(int), info_null, &targetRank); MPI_Alloc_mem(nbSource *sizeof(int), info_null, &sourceRank); targetRankToIndex = new int[mpiSize]; int index = 0; for (int i = 0; i < mpiSize; i++) { if (toSend[i] == 1) { targetRank[index] = i; targetRankToIndex[i] = index; index++; } } MPI_Barrier(communicator); CTimer::get("CMPIRouting::init(get_source)").reset(); CTimer::get("CMPIRouting::init(get_source)").resume(); MPI_Request *request = new MPI_Request[nbSource + nbTarget]; MPI_Status *status = new MPI_Status[nbSource + nbTarget]; int indexRequest = 0; if (verbose) cout << "CMPIRouting::init nbSource : " << nbSource << " nbTarget : " << nbTarget << endl; // assert(recvCntDeb == -1 || recvCntDeb == nbSource); //cout << mpiRank << "DEB : " << recvCntDeb << " nbSource " << nbSource << " nbTarget : " << nbTarget << endl; for (int i = 0; i < nbSource; i++) { #ifdef _usingEP MPI_Irecv(&sourceRank[i], 1, MPI_INT, -1, 0, communicator, &request[indexRequest]); #else MPI_Irecv(&sourceRank[i], 1, MPI_INT, MPI_ANY_SOURCE, 0, communicator, &request[indexRequest]); #endif indexRequest++; } MPI_Barrier(communicator); for (int i = 0; i < nbTarget; i++) { MPI_Isend(&mpiRank, 1, MPI_INT, targetRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } MPI_Waitall(indexRequest, request, status); MPI_Barrier(communicator); //needed CTimer::get("CMPIRouting::init(get_source)").suspend(); CTimer::get("CMPIRouting::init(get_source)").print(); CTimer::get("CMPIRouting::init(get_source)").reset(); CTimer::get("CMPIRouting::init(get_source)").resume(); indexRequest = 0; for (int i = 0; i < nbSource; i++) { #ifdef _usingEP MPI_Irecv(&sourceRank[i], 1, MPI_INT, -1, 0, communicator, &request[indexRequest]); #else MPI_Irecv(&sourceRank[i], 1, MPI_INT, MPI_ANY_SOURCE, 0, communicator, &request[indexRequest]); #endif indexRequest++; } for (int i = 0; i < nbTarget; i++) { MPI_Isend(&mpiRank, 1, MPI_INT, targetRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } MPI_Waitall(indexRequest, request, status); MPI_Barrier(communicator); CTimer::get("CMPIRouting::init(get_source)").suspend(); CTimer::get("CMPIRouting::init(get_source)").print(); CTimer::get("CMPIRouting::init(send_element)").reset(); CTimer::get("CMPIRouting::init(send_element)").resume(); nbTargetElement.resize(nbTarget); nbSourceElement.resize(nbSource); for (int i = 0; i < route.size(); i++) setTargetElementIndex(route[i], targetElementIndex, targetRankToIndex); for (int i = 0; i < targetElementIndex.size(); i++) nbTargetElement[targetElementIndex[i]]++; indexRequest = 0; totalSourceElement = 0; totalTargetElement = 0; for (int i = 0; i < nbSource; i++) { MPI_Irecv(&nbSourceElement[i], 1, MPI_INT, sourceRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } for (int i = 0; i < nbTarget; i++) { totalTargetElement += nbTargetElement[i]; MPI_Isend(&nbTargetElement[i], 1, MPI_INT, targetRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } MPI_Waitall(indexRequest, request, status); CTimer::get("CMPIRouting::init(send_element)").suspend(); CTimer::get("CMPIRouting::init(send_element)").print(); totalSourceElement = 0; for (int i = 0; i < nbSource; i++) totalSourceElement += nbSourceElement[i]; sourceElementIndex.resize(totalSourceElement); totalSourceElement = 0; for (int i = 0; i < nbSource; i++) { for (int j = 0; j < nbSourceElement[i]; j++) { sourceElementIndex[totalSourceElement] = i; totalSourceElement++; } } delete[] toSend; delete[] recvCount; delete[] request; delete[] status; } int CMPIRouting::getTotalSourceElement(void) { return totalSourceElement; } template void CMPIRouting::transferToTarget(T* targetElements, T* sourceElements) { char** targetBuffer = new char*[nbTarget]; int* indexTargetBuffer = new int[nbTarget]; for(int i=0;i void CMPIRouting::transferToTarget(T* targetElements, T* sourceElements, t_pack pack, t_unpack unpack) { char** targetBuffer = new char*[nbTarget]; int* indexTargetBuffer = new int[nbTarget]; int* targetMessageSize = new int[nbTarget]; int* sourceMessageSize = new int[nbSource]; int index; // compute target buffer size for (int i = 0; i < nbTarget; i++) targetMessageSize[i] = 0; for (int i = 0; i < totalTargetElement; i++) { index = targetElementIndex[i]; pack(targetElements[i], NULL, targetMessageSize[index]); } MPI_Request *request = new MPI_Request[nbSource + nbTarget]; MPI_Status *status = new MPI_Status[nbSource + nbTarget]; int indexRequest = 0; MPI_Barrier(communicator); CTimer::get("CMPIRouting::transferToTarget(messageSize)").reset(); CTimer::get("CMPIRouting::transferToTarget(messageSize)").resume(); for(int i=0; i void CMPIRouting::transferFromSource(T* targetElements, T* sourceElements) { char** targetBuffer = new char*[nbTarget]; int* indexTargetBuffer = new int[nbTarget]; for (int i = 0; i < nbTarget; i++) { targetBuffer[i] = new char[sizeof(T)*nbTargetElement[i]]; indexTargetBuffer[i] = 0; } char** sourceBuffer = new char*[nbSource]; int* indexSourceBuffer = new int[nbSource]; for (int i = 0; i < nbSource; i++) { sourceBuffer[i] = new char[sizeof(T)*nbSourceElement[i]]; indexSourceBuffer[i] = 0; } // pack the data int index; for (int i = 0; i < totalSourceElement; i++) { index = sourceElementIndex[i]; *((T*) &(sourceBuffer[index][indexSourceBuffer[index]])) = sourceElements[i]; indexSourceBuffer[index] += sizeof(T); } MPI_Request* request=new MPI_Request[nbSource+nbTarget]; MPI_Status* status=new MPI_Status[nbSource+nbTarget]; int indexRequest=0; for(int i=0; i void CMPIRouting::transferFromSource(T *targetElements, T *sourceElements, t_pack pack, t_unpack unpack) { char **targetBuffer = new char*[nbTarget]; int *indexTargetBuffer = new int[nbTarget]; int *targetMessageSize = new int[nbTarget]; int *sourceMessageSize = new int[nbSource]; int index; // compute target buffer size for (int i = 0; i < nbSource; i++) sourceMessageSize[i] = 0; for (int i = 0; i < totalSourceElement; i++) { index = sourceElementIndex[i]; pack(sourceElements[i], NULL, sourceMessageSize[index]); } MPI_Request *request = new MPI_Request[nbSource + nbTarget]; MPI_Status *status = new MPI_Status[nbSource + nbTarget]; int indexRequest = 0; for (int i = 0; i < nbSource; i++) { MPI_Isend(&sourceMessageSize[i], 1, MPI_INT, sourceRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } for (int i = 0; i < nbTarget; i++) { MPI_Irecv(&targetMessageSize[i], 1, MPI_INT, targetRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } MPI_Waitall(indexRequest, request, status); for (int i = 0; i < nbTarget; i++) { targetBuffer[i] = new char[targetMessageSize[i]]; indexTargetBuffer[i] = 0; } char **sourceBuffer = new char*[nbSource]; int *indexSourceBuffer = new int[nbSource]; for (int i = 0; i < nbSource; i++) { sourceBuffer[i] = new char[sourceMessageSize[i]]; indexSourceBuffer[i] = 0; } // pack the data for (int i = 0; i < totalSourceElement; i++) { index = sourceElementIndex[i]; pack(sourceElements[i], sourceBuffer[index], indexSourceBuffer[index] ); } indexRequest = 0; for (int i = 0; i < nbSource; i++) { MPI_Isend(sourceBuffer[i], sourceMessageSize[i], MPI_CHAR, sourceRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } for (int i = 0; i < nbTarget; i++) { MPI_Irecv(targetBuffer[i], targetMessageSize[i], MPI_CHAR, targetRank[i], 0, communicator, &request[indexRequest]); indexRequest++; } MPI_Waitall(indexRequest, request, status); // unpack the data for (int i = 0; i < totalTargetElement; i++) { index = targetElementIndex[i]; unpack(targetElements[i], targetBuffer[index], indexTargetBuffer[index]); } for (int i = 0; i < nbTarget; i++) delete[] targetBuffer[i]; for (int i = 0; i < nbSource; i++) delete[] sourceBuffer[i]; delete[] targetBuffer; delete[] indexTargetBuffer; delete[] targetMessageSize; delete[] sourceBuffer; delete[] indexSourceBuffer; delete[] sourceMessageSize; delete[] request; delete[] status; } CMPIRouting::~CMPIRouting() { }; template void CMPIRouting::init(const std::vector& route, CMPICascade *cascade); template void CMPIRouting::init(const std::vector >& route, CMPICascade *cascade); template void CMPIRouting::transferToTarget(int *targetElements, int *sourceElements); template void CMPIRouting::transferToTarget(int *targetElements, int *sourceElements, void (*pack)(int&, char*, int&), void (* unpack)(int&, char *, int&)); template void CMPIRouting::transferFromSource(int *targetElements, int *sourceElements); template void CMPIRouting::transferFromSource(int *targetElements, int *sourceElements, void (*pack)(int&, char*, int&), void (* unpack)(int&, char *, int&)); template void CMPIRouting::transferToTarget(Node* targetElements, Node* sourceElements, void (*pack)(Node&, char*, int&), void (* unpack)(Node&, char *, int&)); template void CMPIRouting::transferToTarget(Elt **targetElements, Elt **sourceElements, void (*pack)(Elt *, char*, int&), void (* unpack)(Elt *, char *, int&)); template void CMPIRouting::transferFromSource(std::vector *targetElements, std::vector *sourceElements, void (*pack)(const std::vector&, char*, int&), void (* unpack)(std::vector&, const char *, int&)); struct NES { int cnt; int risc; int rank; }; template void alltoalls_unknown(const std::vector >& send, std::vector >& recv, const std::vector& ranks, MPI_Comm communicator); template void alltoalls_known(const std::vector >& send, std::vector >& recv, const std::vector& ranks, MPI_Comm communicator); }