/*! \file client_client_dht.cpp \author Ha NGUYEN \since 15 Sep 2015 \date 15 Sep 2015 \brief Distributed hashed table implementation. */ #include "client_client_dht.hpp" #include #include #include #include "utils.hpp" #include "mpi_tag.hpp" namespace xios { CClientClientDHT::CClientClientDHT(const boost::unordered_map& indexInfoMap, const MPI_Comm& clientIntraComm, bool isDataDistributed, int hierarLvl) : intraCommRoot_(clientIntraComm), commLevel_(), isDataDistributed_(isDataDistributed), nbLevel_(hierarLvl), globalIndexToServerMapping_(), globalIndexToInfoMappingLevel_() { computeMPICommLevel(clientIntraComm); int lvl = commLevel_.size() - 1; computeDistributedIndex(indexInfoMap, commLevel_[lvl], lvl); } CClientClientDHT::~CClientClientDHT() { } /*! Calculate MPI communicator for each level of hierarchy. \param[in] mpiCommRoot MPI communicator of the level 0 (usually communicator of all clients) */ void CClientClientDHT::computeMPICommLevel(const MPI_Comm& mpiCommRoot) { int nbProc; MPI_Comm_size(mpiCommRoot,&nbProc); if (nbLevel_ > nbProc) nbLevel_ = std::log(nbProc); else if (1 > nbLevel_) nbLevel_ = 1; commLevel_.push_back(mpiCommRoot); divideMPICommLevel(mpiCommRoot, nbLevel_); } /*! Divide each MPI communicator into sub-communicator. Recursive function \param [in] mpiCommLevel MPI communicator of current level \param [in] level current level */ void CClientClientDHT::divideMPICommLevel(const MPI_Comm& mpiCommLevel, int level) { int clientRank; MPI_Comm_rank(mpiCommLevel,&clientRank); --level; if (0 < level) { int color = clientRank % 2; commLevel_.push_back(MPI_Comm()); MPI_Comm_split(mpiCommLevel, color, 0, &(commLevel_.back())); divideMPICommLevel(commLevel_.back(), level); } } /*! Compute mapping between indices and information corresponding to these indices \param [in] indices indices a proc has */ void CClientClientDHT::computeServerIndexMapping(const CArray& indices) { int lvl = commLevel_.size() - 1; computeIndexMapping(indices, commLevel_[lvl], lvl); size_t size = indices.numElements(); for (size_t idx = 0; idx < size; ++idx) { int serverIdx = globalIndexToInfoMappingLevel_[indices(idx)]; indexGlobalOnServer_[serverIdx].push_back(indices(idx)); } } /*! Compute mapping between indices and information corresponding to these indices for each level of hierarchical DHT. Recursive function \param [in] indices indices a proc has \param [in] commLevel communicator of current level \param [in] level current level */ void CClientClientDHT::computeIndexMapping(const CArray& indices, const MPI_Comm& commLevel, int level) { int nbClient, clientRank; MPI_Comm_size(commLevel,&nbClient); MPI_Comm_rank(commLevel,&clientRank); std::vector hashedIndex; computeHashIndex(hashedIndex, nbClient); size_t ssize = indices.numElements(), hashedVal; std::vector::const_iterator itbClientHash = hashedIndex.begin(), itClientHash, iteClientHash = hashedIndex.end(); std::map > client2ClientIndex; // Number of global index whose mapping server can be found out thanks to index-server mapping int nbIndexAlreadyOnClient = 0; // Number of global index whose mapping server are on other clients int nbIndexSendToOthers = 0; HashXIOS hashGlobalIndex; for (int i = 0; i < ssize; ++i) { size_t index = indices(i); hashedVal = hashGlobalIndex(index); itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal); if (iteClientHash != itClientHash) { int indexClient = std::distance(itbClientHash, itClientHash)-1; { client2ClientIndex[indexClient].push_back(index); ++nbIndexSendToOthers; } } } info << "level " << level << " nbIndexsendtoOther " << nbIndexSendToOthers << std::endl; int* sendBuff = new int[nbClient]; for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0; std::map >::iterator itb = client2ClientIndex.begin(), it, ite = client2ClientIndex.end(); for (it = itb; it != ite; ++it) sendBuff[it->first] = 1; int* recvBuff = new int[nbClient]; MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); std::list sendRequest; if (0 != nbIndexSendToOthers) for (it = itb; it != ite; ++it) sendIndexToClients(it->first, it->second, commLevel, sendRequest); int nbDemandingClient = recvBuff[clientRank], nbIndexServerReceived = 0; // Receiving demand as well as the responds from other clients // The demand message contains global index; meanwhile the responds have server index information // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s) // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size(); MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); unsigned long* recvBuffIndex = 0; int maxNbIndexDemandedFromOthers = recvBuff[clientRank]; // if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general if (0 != maxNbIndexDemandedFromOthers) recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers]; // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients int* recvBuffInfo = 0; int nbIndexReceivedFromOthers = nbIndexSendToOthers; if (0 != nbIndexReceivedFromOthers) recvBuffInfo = new int[nbIndexReceivedFromOthers]; std::map::iterator itRequest; std::vector demandAlreadyReceived, repondAlreadyReceived; // Counting of buffer for receiving global index int countIndex = 0; // Request returned by MPI_IRecv function about global index std::map requestRecvIndex; // Mapping client rank and the beginning position of receiving buffer for message of global index from this client std::map indexBuffBegin; std::map > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level CArray tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers); int k = 0; while ((0 < nbDemandingClient) || (!sendRequest.empty())) { // Just check whether a client has any demand from other clients. // If it has, then it should send responds to these client(s) probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers, countIndex, indexBuffBegin, requestRecvIndex, commLevel); if (0 < nbDemandingClient) { for (itRequest = requestRecvIndex.begin(); itRequest != requestRecvIndex.end(); ++itRequest) { int flagIndexGlobal, count; MPI_Status statusIndexGlobal; MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal); if (true == flagIndexGlobal) { MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); int clientSourceRank = statusIndexGlobal.MPI_SOURCE; unsigned long* beginBuff = indexBuffBegin[clientSourceRank]; for (int i = 0; i < count; ++i) { src2Index[clientSourceRank].push_back(*(beginBuff+i)); tmpGlobalIndexOnClient(k) = *(beginBuff+i); ++k; } --nbDemandingClient; demandAlreadyReceived.push_back(clientSourceRank); } } for (int i = 0; i< demandAlreadyReceived.size(); ++i) requestRecvIndex.erase(demandAlreadyReceived[i]); } testSendRequest(sendRequest); } if (0 < level) { --level; computeIndexMapping(tmpGlobalIndexOnClient, commLevel_[level], level); } else globalIndexToInfoMappingLevel_ = globalIndexToServerMapping_; std::map > client2ClientInfo; std::list sendInfoRequest; std::map >::iterator itbSrc2Idx = src2Index.begin(), itSrc2Idx, iteSrc2Idx = src2Index.end(); for (itSrc2Idx = itbSrc2Idx; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx) { int clientSourceRank = itSrc2Idx->first; std::vector& srcIdx = itSrc2Idx->second; for (int idx = 0; idx < srcIdx.size(); ++idx) { // client2ClientInfo[clientSourceRank].push_back(globalIndexToServerMapping_[srcIdx[idx]]); client2ClientInfo[clientSourceRank].push_back(globalIndexToInfoMappingLevel_[srcIdx[idx]]); } sendInfoToClients(clientSourceRank, client2ClientInfo[clientSourceRank], commLevel, sendInfoRequest); } boost::unordered_map indexToInfoMapping; // Counting of buffer for receiving server index int countInfo = 0; std::map requestRecvInfo; // Mapping client rank and the begining position of receiving buffer for message of server index from this client std::map infoBuffBegin; while ((!sendInfoRequest.empty()) || (nbIndexServerReceived < nbIndexReceivedFromOthers)) { testSendRequest(sendInfoRequest); // In some cases, a client need to listen respond from other clients about server information // Ok, with the information, a client can fill in its server-global index map. probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers, countInfo, infoBuffBegin, requestRecvInfo, commLevel); for (itRequest = requestRecvInfo.begin(); itRequest != requestRecvInfo.end(); ++itRequest) { int flagInfo, count; MPI_Status statusInfo; MPI_Test(&(itRequest->second), &flagInfo, &statusInfo); if (true == flagInfo) { MPI_Get_count(&statusInfo, MPI_INT, &count); int clientSourceRank = statusInfo.MPI_SOURCE; int* beginBuff = infoBuffBegin[clientSourceRank]; std::vector& globalIndexTmp = client2ClientIndex[clientSourceRank]; for (int i = 0; i < count; ++i) { indexToInfoMapping[globalIndexTmp[i]] = *(beginBuff+i); // globalIndexToServerMapping_[globalIndexTmp[i]] = *(beginBuff+i); } nbIndexServerReceived += count; repondAlreadyReceived.push_back(clientSourceRank); } } for (int i = 0; i< repondAlreadyReceived.size(); ++i) requestRecvInfo.erase(repondAlreadyReceived[i]); repondAlreadyReceived.resize(0); } globalIndexToInfoMappingLevel_ = indexToInfoMapping; info << "temp " << tmpGlobalIndexOnClient << std::endl; if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex; if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo; delete [] sendBuff; delete [] recvBuff; } /*! Compute the hash index distribution of whole size_t space then each client will have a range of this distribution */ void CClientClientDHT::computeHashIndex(std::vector& hashedIndex, int nbClient) { // Compute range of hash index for each client hashedIndex.resize(nbClient+1); size_t nbHashIndexMax = std::numeric_limits::max(); size_t nbHashIndex; hashedIndex[0] = 0; for (int i = 1; i < nbClient; ++i) { nbHashIndex = nbHashIndexMax / nbClient; if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex; hashedIndex[i] = hashedIndex[i-1] + nbHashIndex; } hashedIndex[nbClient] = nbHashIndexMax; } /*! Compute distribution of global index for servers Each client already holds a piece of information and its attached index. This information will be redistributed among processes by projecting indices into size_t space. After the redistribution, each client holds rearranged index and its corresponding information. \param [in] indexInfoMap index and its corresponding info (usually server index) \param [in] commLevel communicator of current level \param [in] level current level */ void CClientClientDHT::computeDistributedIndex(const boost::unordered_map& indexInfoMap, const MPI_Comm& commLevel, int level) { int nbClient, clientRank; MPI_Comm_size(commLevel,&nbClient); MPI_Comm_rank(commLevel,&clientRank); std::vector hashedIndex; computeHashIndex(hashedIndex, nbClient); int* sendBuff = new int[nbClient]; int* sendNbIndexBuff = new int[nbClient]; for (int i = 0; i < nbClient; ++i) { sendBuff[i] = 0; sendNbIndexBuff[i] = 0; } // Compute size of sending and receving buffer std::map > client2ClientIndex; std::map > client2ClientInfo; std::vector::const_iterator itbClientHash = hashedIndex.begin(), itClientHash, iteClientHash = hashedIndex.end(); boost::unordered_map::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end(); HashXIOS hashGlobalIndex; for (; it != ite; ++it) { size_t hashIndex = hashGlobalIndex(it->first); itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex); if (itClientHash != iteClientHash) { int indexClient = std::distance(itbClientHash, itClientHash)-1; { sendBuff[indexClient] = 1; ++sendNbIndexBuff[indexClient]; client2ClientIndex[indexClient].push_back(it->first); client2ClientInfo[indexClient].push_back(it->second); } } } // Calculate from how many clients each client receive message. int* recvBuff = new int[nbClient]; MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); int recvNbClient = recvBuff[clientRank]; // Calculate size of buffer for receiving message int* recvNbIndexBuff = new int[nbClient]; MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel); int recvNbIndexCount = recvNbIndexBuff[clientRank]; unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; int* recvInfoBuff = new int[recvNbIndexCount]; // If a client holds information about index and the corresponding which don't belong to it, // it will send a message to the correct clients. // Contents of the message are index and its corresponding informatioin std::list sendRequest; std::map >::iterator itIndex = client2ClientIndex.begin(), iteIndex = client2ClientIndex.end(); for (; itIndex != iteIndex; ++itIndex) sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest); std::map >::iterator itInfo = client2ClientInfo.begin(), iteInfo = client2ClientInfo.end(); for (; itInfo != iteInfo; ++itInfo) sendInfoToClients(itInfo->first, itInfo->second, commLevel, sendRequest); std::map::iterator itRequestIndex, itRequestInfo; std::map countBuffInfo, countBuffIndex; std::vector processedList; bool isFinished = (0 == recvNbClient) ? true : false; // Counting of buffer for receiving global index int countIndex = 0; // Counting of buffer for receiving server index int countInfo = 0; // Request returned by MPI_IRecv function about global index std::map requestRecvIndex, requestRecvInfo; // Mapping client rank and the beginning position of receiving buffer for message of global index from this client std::map indexBuffBegin; // Mapping client rank and the begining position of receiving buffer for message of server index from this client std::map infoBuffBegin; boost::unordered_map indexToInfoMapping; // Now each client trys to listen to demand from others. // If they have message, it processes: pushing global index and corresponding server to its map while (!isFinished || (!sendRequest.empty())) { testSendRequest(sendRequest); probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount, countIndex, indexBuffBegin, requestRecvIndex, commLevel); // Processing complete request for (itRequestIndex = requestRecvIndex.begin(); itRequestIndex != requestRecvIndex.end(); ++itRequestIndex) { int rank = itRequestIndex->first; int count = computeBuffCountIndexGlobal(itRequestIndex->second); if (0 != count) countBuffIndex[rank] = count; } probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount, countInfo, infoBuffBegin, requestRecvInfo, commLevel); for (itRequestInfo = requestRecvInfo.begin(); itRequestInfo != requestRecvInfo.end(); ++itRequestInfo) { int rank = itRequestInfo->first; int count = computeBuffCountIndexServer(itRequestInfo->second); if (0 != count) countBuffInfo[rank] = count; } for (std::map::iterator it = countBuffIndex.begin(); it != countBuffIndex.end(); ++it) { int rank = it->first; if ((countBuffInfo.end() != countBuffInfo.find(rank)) && (countBuffIndex.end() != countBuffIndex.find(rank))) { int count = it->second; for (int i = 0; i < count; ++i) indexToInfoMapping.insert(std::make_pair(*(indexBuffBegin[rank]+i),*(infoBuffBegin[rank]+i))); processedList.push_back(rank); --recvNbClient; } } for (int i = 0; i < processedList.size(); ++i) { requestRecvInfo.erase(processedList[i]); requestRecvIndex.erase(processedList[i]); countBuffIndex.erase(processedList[i]); countBuffInfo.erase(processedList[i]); } if (0 == recvNbClient) isFinished = true; } delete [] sendBuff; delete [] sendNbIndexBuff; delete [] recvBuff; delete [] recvNbIndexBuff; delete [] recvIndexBuff; delete [] recvInfoBuff; // Ok, now do something recursive if (0 < level) { --level; computeDistributedIndex(indexToInfoMapping, commLevel_[level], level); } else globalIndexToServerMapping_ = indexToInfoMapping; } /*! Probe and receive message containg global index from other clients. Each client can send a message of global index to other clients to fulfill their maps. Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer \param [in] recvIndexBuff buffer dedicated for receiving global index \param [in] recvNbIndexCount size of the buffer \param [in] countIndex number of received index \param [in] indexBuffBegin beginning of index buffer for each source rank \param [in] requestRecvIndex request of receving index \param [in] intraComm communicator */ void CClientClientDHT::probeIndexMessageFromClients(unsigned long* recvIndexBuff, const int recvNbIndexCount, int& countIndex, std::map& indexBuffBegin, std::map& requestRecvIndex, const MPI_Comm& intraComm) { MPI_Status statusIndexGlobal; int flagIndexGlobal, count; // Probing for global index MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal); if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount)) { MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); indexBuffBegin.insert(std::make_pair(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex)); MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG, statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm, &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]); countIndex += count; } } /*! Probe and receive message containg server index from other clients. Each client can send a message of server index to other clients to fulfill their maps. Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer \param [in] recvInfoBuff buffer dedicated for receiving server index \param [in] recvNbIndexCount size of the buffer \param [in] countInfo number of received info \param [in] infoBuffBegin beginning of index buffer for each source rank \param [in] requestRecvInfo request of receving index \param [in] intraComm communicator */ void CClientClientDHT::probeInfoMessageFromClients(int* recvInfoBuff, const int recvNbIndexCount, int& countInfo, std::map& infoBuffBegin, std::map& requestRecvInfo, const MPI_Comm& intraComm) { MPI_Status statusInfo; int flagInfo, count; // Probing for server index MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo); if ((true == flagInfo) && (countInfo < recvNbIndexCount)) { MPI_Get_count(&statusInfo, MPI_INT, &count); infoBuffBegin.insert(std::make_pair(statusInfo.MPI_SOURCE, recvInfoBuff+countInfo)); MPI_Irecv(recvInfoBuff+countInfo, count, MPI_INT, statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm, &requestRecvInfo[statusInfo.MPI_SOURCE]); countInfo += count; } } /*! Send message containing index to clients \param [in] clientDestRank rank of destination client \param [in] indices index to send \param [in] clientIntraComm communication group of client \param [in] requestSendIndex list of sending request */ void CClientClientDHT::sendIndexToClients(int clientDestRank, std::vector& indices, const MPI_Comm& clientIntraComm, std::list& requestSendIndex) { MPI_Request request; requestSendIndex.push_back(request); MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG, clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back())); } /*! Send message containing information to clients \param [in] clientDestRank rank of destination client \param [in] info server index to send \param [in] clientIntraComm communication group of client \param [in] requestSendInfo list of sending request */ void CClientClientDHT::sendInfoToClients(int clientDestRank, std::vector& info, const MPI_Comm& clientIntraComm, std::list& requestSendInfo) { MPI_Request request; requestSendInfo.push_back(request); MPI_Isend(&(info)[0], (info).size(), MPI_INT, clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back())); } /*! Verify status of sending request \param [in] sendRequest sending request to verify */ void CClientClientDHT::testSendRequest(std::list& sendRequest) { int flag = 0; MPI_Status status; std::list::iterator itRequest; int sizeListRequest = sendRequest.size(); int idx = 0; while (idx < sizeListRequest) { bool isErased = false; for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest) { MPI_Test(&(*itRequest), &flag, &status); if (true == flag) { isErased = true; break; } } if (true == isErased) sendRequest.erase(itRequest); ++idx; } } /*! Process the received request. Pushing global index and server index into map \param[in] buffIndexGlobal pointer to the begining of buffer containing global index \param[in] buffIndexServer pointer to the begining of buffer containing server index \param[in] count size of received message */ //void CClientClientDHT::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count) //{ // for (int i = 0; i < count; ++i) // globalIndexToServerMapping_.insert(std::make_pair(*(buffIndexGlobal+i),*(buffIndexServer+i))); //} /*! Compute size of message containing global index \param[in] requestRecv request of message */ int CClientClientDHT::computeBuffCountIndexGlobal(MPI_Request& requestRecv) { int flag, count = 0; MPI_Status status; MPI_Test(&requestRecv, &flag, &status); if (true == flag) { MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count); } return count; } /*! Compute size of message containing server index \param[in] requestRecv request of message */ int CClientClientDHT::computeBuffCountIndexServer(MPI_Request& requestRecv) { int flag, count = 0; MPI_Status status; MPI_Test(&requestRecv, &flag, &status); if (true == flag) { MPI_Get_count(&status, MPI_INT, &count); } return count; } }