Index: /XIOS/trunk/inputs/REMAP/iodef.xml
===================================================================
--- /XIOS/trunk/inputs/REMAP/iodef.xml (revision 719)
+++ /XIOS/trunk/inputs/REMAP/iodef.xml (revision 720)
@@ -18,5 +18,5 @@
-
+
Index: /XIOS/trunk/inputs/Version2/iodef.xml
===================================================================
--- /XIOS/trunk/inputs/Version2/iodef.xml (revision 719)
+++ /XIOS/trunk/inputs/Version2/iodef.xml (revision 720)
@@ -19,5 +19,5 @@
-
+
@@ -34,5 +34,5 @@
-
+
Index: /XIOS/trunk/inputs/iodef.xml
===================================================================
--- /XIOS/trunk/inputs/iodef.xml (revision 719)
+++ /XIOS/trunk/inputs/iodef.xml (revision 720)
@@ -12,7 +12,7 @@
-
+
-
+
@@ -22,5 +22,5 @@
-
+
Index: /XIOS/trunk/src/client_client_dht.cpp
===================================================================
--- /XIOS/trunk/src/client_client_dht.cpp (revision 720)
+++ /XIOS/trunk/src/client_client_dht.cpp (revision 720)
@@ -0,0 +1,672 @@
+/*!
+ \file client_client_dht.cpp
+ \author Ha NGUYEN
+ \since 15 Sep 2015
+ \date 15 Sep 2015
+
+ \brief Distributed hashed table implementation.
+ */
+#include "client_client_dht.hpp"
+#include
+#include
+#include
+#include "utils.hpp"
+#include "mpi_tag.hpp"
+
+namespace xios
+{
+
+CClientClientDHT::CClientClientDHT(const boost::unordered_map& indexInfoMap,
+ const MPI_Comm& clientIntraComm, bool isDataDistributed, int hierarLvl)
+ : intraCommRoot_(clientIntraComm), commLevel_(), isDataDistributed_(isDataDistributed),
+ nbLevel_(hierarLvl), globalIndexToServerMapping_(), globalIndexToInfoMappingLevel_()
+{
+ computeMPICommLevel(clientIntraComm);
+ int lvl = commLevel_.size() - 1;
+ computeDistributedIndex(indexInfoMap, commLevel_[lvl], lvl);
+}
+
+CClientClientDHT::~CClientClientDHT()
+{
+}
+
+/*!
+ Calculate MPI communicator for each level of hierarchy.
+ \param[in] mpiCommRoot MPI communicator of the level 0 (usually communicator of all clients)
+*/
+void CClientClientDHT::computeMPICommLevel(const MPI_Comm& mpiCommRoot)
+{
+ int nbProc;
+ MPI_Comm_size(mpiCommRoot,&nbProc);
+ if (nbLevel_ > nbProc) nbLevel_ = std::log(nbProc);
+ else if (1 > nbLevel_) nbLevel_ = 1;
+
+ commLevel_.push_back(mpiCommRoot);
+ divideMPICommLevel(mpiCommRoot, nbLevel_);
+}
+
+/*!
+ Divide each MPI communicator into sub-communicator. Recursive function
+ \param [in] mpiCommLevel MPI communicator of current level
+ \param [in] level current level
+*/
+void CClientClientDHT::divideMPICommLevel(const MPI_Comm& mpiCommLevel, int level)
+{
+ int clientRank;
+ MPI_Comm_rank(mpiCommLevel,&clientRank);
+
+ --level;
+ if (0 < level)
+ {
+ int color = clientRank % 2;
+ commLevel_.push_back(MPI_Comm());
+ MPI_Comm_split(mpiCommLevel, color, 0, &(commLevel_.back()));
+ divideMPICommLevel(commLevel_.back(), level);
+ }
+}
+
+/*!
+ Compute mapping between indices and information corresponding to these indices
+ \param [in] indices indices a proc has
+*/
+void CClientClientDHT::computeServerIndexMapping(const CArray& indices)
+{
+ int lvl = commLevel_.size() - 1;
+ computeIndexMapping(indices, commLevel_[lvl], lvl);
+ size_t size = indices.numElements();
+ for (size_t idx = 0; idx < size; ++idx)
+ {
+ int serverIdx = globalIndexToInfoMappingLevel_[indices(idx)];
+ indexGlobalOnServer_[serverIdx].push_back(indices(idx));
+ }
+}
+
+/*!
+ Compute mapping between indices and information corresponding to these indices
+for each level of hierarchical DHT. Recursive function
+ \param [in] indices indices a proc has
+ \param [in] commLevel communicator of current level
+ \param [in] level current level
+*/
+void CClientClientDHT::computeIndexMapping(const CArray& indices,
+ const MPI_Comm& commLevel,
+ int level)
+{
+ int nbClient, clientRank;
+ MPI_Comm_size(commLevel,&nbClient);
+ MPI_Comm_rank(commLevel,&clientRank);
+ std::vector hashedIndex;
+ computeHashIndex(hashedIndex, nbClient);
+
+ size_t ssize = indices.numElements(), hashedVal;
+
+ std::vector::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
+ iteClientHash = hashedIndex.end();
+ std::map > client2ClientIndex;
+
+ // Number of global index whose mapping server can be found out thanks to index-server mapping
+ int nbIndexAlreadyOnClient = 0;
+
+ // Number of global index whose mapping server are on other clients
+ int nbIndexSendToOthers = 0;
+ HashXIOS hashGlobalIndex;
+ for (int i = 0; i < ssize; ++i)
+ {
+ size_t index = indices(i);
+ hashedVal = hashGlobalIndex(index);
+ itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
+ if (iteClientHash != itClientHash)
+ {
+ int indexClient = std::distance(itbClientHash, itClientHash)-1;
+ {
+ client2ClientIndex[indexClient].push_back(index);
+ ++nbIndexSendToOthers;
+ }
+ }
+ }
+ info << "level " << level << " nbIndexsendtoOther " << nbIndexSendToOthers << std::endl;
+
+ int* sendBuff = new int[nbClient];
+ for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0;
+ std::map >::iterator itb = client2ClientIndex.begin(), it,
+ ite = client2ClientIndex.end();
+ for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
+ int* recvBuff = new int[nbClient];
+ MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
+
+ std::list sendRequest;
+ if (0 != nbIndexSendToOthers)
+ for (it = itb; it != ite; ++it)
+ sendIndexToClients(it->first, it->second, commLevel, sendRequest);
+
+ int nbDemandingClient = recvBuff[clientRank], nbIndexServerReceived = 0;
+
+ // Receiving demand as well as the responds from other clients
+ // The demand message contains global index; meanwhile the responds have server index information
+ // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
+ // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
+ for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
+ MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
+
+ unsigned long* recvBuffIndex = 0;
+ int maxNbIndexDemandedFromOthers = recvBuff[clientRank];
+// if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general
+
+ if (0 != maxNbIndexDemandedFromOthers)
+ recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
+
+ // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
+ int* recvBuffInfo = 0;
+ int nbIndexReceivedFromOthers = nbIndexSendToOthers;
+ if (0 != nbIndexReceivedFromOthers)
+ recvBuffInfo = new int[nbIndexReceivedFromOthers];
+
+ std::map::iterator itRequest;
+ std::vector demandAlreadyReceived, repondAlreadyReceived;
+
+ // Counting of buffer for receiving global index
+ int countIndex = 0;
+
+ // Request returned by MPI_IRecv function about global index
+ std::map requestRecvIndex;
+
+ // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
+ std::map indexBuffBegin;
+
+ std::map > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
+
+ CArray tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
+ int k = 0;
+
+ while ((0 < nbDemandingClient) || (!sendRequest.empty()))
+ {
+ // Just check whether a client has any demand from other clients.
+ // If it has, then it should send responds to these client(s)
+ probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
+ countIndex, indexBuffBegin,
+ requestRecvIndex, commLevel);
+ if (0 < nbDemandingClient)
+ {
+ for (itRequest = requestRecvIndex.begin();
+ itRequest != requestRecvIndex.end(); ++itRequest)
+ {
+ int flagIndexGlobal, count;
+ MPI_Status statusIndexGlobal;
+
+ MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
+ if (true == flagIndexGlobal)
+ {
+ MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
+ int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
+ unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
+ for (int i = 0; i < count; ++i)
+ {
+ src2Index[clientSourceRank].push_back(*(beginBuff+i));
+ tmpGlobalIndexOnClient(k) = *(beginBuff+i);
+ ++k;
+ }
+ --nbDemandingClient;
+
+ demandAlreadyReceived.push_back(clientSourceRank);
+ }
+ }
+ for (int i = 0; i< demandAlreadyReceived.size(); ++i)
+ requestRecvIndex.erase(demandAlreadyReceived[i]);
+ }
+
+ testSendRequest(sendRequest);
+ }
+
+ if (0 < level)
+ {
+ --level;
+ computeIndexMapping(tmpGlobalIndexOnClient, commLevel_[level], level);
+ }
+ else
+ globalIndexToInfoMappingLevel_ = globalIndexToServerMapping_;
+
+ std::map > client2ClientInfo;
+ std::list sendInfoRequest;
+ std::map >::iterator itbSrc2Idx = src2Index.begin(), itSrc2Idx,
+ iteSrc2Idx = src2Index.end();
+ for (itSrc2Idx = itbSrc2Idx; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx)
+ {
+ int clientSourceRank = itSrc2Idx->first;
+ std::vector& srcIdx = itSrc2Idx->second;
+ for (int idx = 0; idx < srcIdx.size(); ++idx)
+ {
+// client2ClientInfo[clientSourceRank].push_back(globalIndexToServerMapping_[srcIdx[idx]]);
+ client2ClientInfo[clientSourceRank].push_back(globalIndexToInfoMappingLevel_[srcIdx[idx]]);
+ }
+ sendInfoToClients(clientSourceRank, client2ClientInfo[clientSourceRank], commLevel, sendInfoRequest);
+ }
+
+ boost::unordered_map indexToInfoMapping;
+
+ // Counting of buffer for receiving server index
+ int countInfo = 0;
+
+ std::map requestRecvInfo;
+
+ // Mapping client rank and the begining position of receiving buffer for message of server index from this client
+ std::map infoBuffBegin;
+
+ while ((!sendInfoRequest.empty()) || (nbIndexServerReceived < nbIndexReceivedFromOthers))
+ {
+ testSendRequest(sendInfoRequest);
+
+ // In some cases, a client need to listen respond from other clients about server information
+ // Ok, with the information, a client can fill in its server-global index map.
+ probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
+ countInfo, infoBuffBegin,
+ requestRecvInfo, commLevel);
+ for (itRequest = requestRecvInfo.begin();
+ itRequest != requestRecvInfo.end();
+ ++itRequest)
+ {
+ int flagInfo, count;
+ MPI_Status statusInfo;
+
+ MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
+ if (true == flagInfo)
+ {
+ MPI_Get_count(&statusInfo, MPI_INT, &count);
+ int clientSourceRank = statusInfo.MPI_SOURCE;
+ int* beginBuff = infoBuffBegin[clientSourceRank];
+ std::vector& globalIndexTmp = client2ClientIndex[clientSourceRank];
+ for (int i = 0; i < count; ++i)
+ {
+ indexToInfoMapping[globalIndexTmp[i]] = *(beginBuff+i);
+// globalIndexToServerMapping_[globalIndexTmp[i]] = *(beginBuff+i);
+ }
+ nbIndexServerReceived += count;
+ repondAlreadyReceived.push_back(clientSourceRank);
+ }
+ }
+
+ for (int i = 0; i< repondAlreadyReceived.size(); ++i)
+ requestRecvInfo.erase(repondAlreadyReceived[i]);
+ repondAlreadyReceived.resize(0);
+ }
+
+ globalIndexToInfoMappingLevel_ = indexToInfoMapping;
+ info << "temp " << tmpGlobalIndexOnClient << std::endl;
+ if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
+ if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
+ delete [] sendBuff;
+ delete [] recvBuff;
+}
+
+/*!
+ Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
+*/
+void CClientClientDHT::computeHashIndex(std::vector& hashedIndex, int nbClient)
+{
+ // Compute range of hash index for each client
+ hashedIndex.resize(nbClient+1);
+ size_t nbHashIndexMax = std::numeric_limits::max();
+ size_t nbHashIndex;
+ hashedIndex[0] = 0;
+ for (int i = 1; i < nbClient; ++i)
+ {
+ nbHashIndex = nbHashIndexMax / nbClient;
+ if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
+ hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
+ }
+ hashedIndex[nbClient] = nbHashIndexMax;
+}
+
+/*!
+ Compute distribution of global index for servers
+ Each client already holds a piece of information and its attached index.
+This information will be redistributed among processes by projecting indices into size_t space.
+After the redistribution, each client holds rearranged index and its corresponding information.
+ \param [in] indexInfoMap index and its corresponding info (usually server index)
+ \param [in] commLevel communicator of current level
+ \param [in] level current level
+*/
+void CClientClientDHT::computeDistributedIndex(const boost::unordered_map& indexInfoMap,
+ const MPI_Comm& commLevel,
+ int level)
+{
+ int nbClient, clientRank;
+ MPI_Comm_size(commLevel,&nbClient);
+ MPI_Comm_rank(commLevel,&clientRank);
+ std::vector hashedIndex;
+ computeHashIndex(hashedIndex, nbClient);
+
+ int* sendBuff = new int[nbClient];
+ int* sendNbIndexBuff = new int[nbClient];
+ for (int i = 0; i < nbClient; ++i)
+ {
+ sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
+ }
+
+ // Compute size of sending and receving buffer
+ std::map > client2ClientIndex;
+ std::map > client2ClientInfo;
+
+ std::vector::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
+ iteClientHash = hashedIndex.end();
+ boost::unordered_map::const_iterator it = indexInfoMap.begin(),
+ ite = indexInfoMap.end();
+ HashXIOS hashGlobalIndex;
+ for (; it != ite; ++it)
+ {
+ size_t hashIndex = hashGlobalIndex(it->first);
+ itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
+ if (itClientHash != iteClientHash)
+ {
+ int indexClient = std::distance(itbClientHash, itClientHash)-1;
+ {
+ sendBuff[indexClient] = 1;
+ ++sendNbIndexBuff[indexClient];
+ client2ClientIndex[indexClient].push_back(it->first);
+ client2ClientInfo[indexClient].push_back(it->second);
+ }
+ }
+ }
+
+ // Calculate from how many clients each client receive message.
+ int* recvBuff = new int[nbClient];
+ MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
+ int recvNbClient = recvBuff[clientRank];
+
+ // Calculate size of buffer for receiving message
+ int* recvNbIndexBuff = new int[nbClient];
+ MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
+ int recvNbIndexCount = recvNbIndexBuff[clientRank];
+ unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
+ int* recvInfoBuff = new int[recvNbIndexCount];
+
+ // If a client holds information about index and the corresponding which don't belong to it,
+ // it will send a message to the correct clients.
+ // Contents of the message are index and its corresponding informatioin
+ std::list sendRequest;
+ std::map >::iterator itIndex = client2ClientIndex.begin(),
+ iteIndex = client2ClientIndex.end();
+ for (; itIndex != iteIndex; ++itIndex)
+ sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest);
+ std::map >::iterator itInfo = client2ClientInfo.begin(),
+ iteInfo = client2ClientInfo.end();
+ for (; itInfo != iteInfo; ++itInfo)
+ sendInfoToClients(itInfo->first, itInfo->second, commLevel, sendRequest);
+
+ std::map::iterator itRequestIndex, itRequestInfo;
+ std::map countBuffInfo, countBuffIndex;
+ std::vector processedList;
+
+ bool isFinished = (0 == recvNbClient) ? true : false;
+
+ // Counting of buffer for receiving global index
+ int countIndex = 0;
+
+ // Counting of buffer for receiving server index
+ int countInfo = 0;
+
+ // Request returned by MPI_IRecv function about global index
+ std::map requestRecvIndex, requestRecvInfo;
+
+ // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
+ std::map indexBuffBegin;
+
+ // Mapping client rank and the begining position of receiving buffer for message of server index from this client
+ std::map infoBuffBegin;
+
+ boost::unordered_map indexToInfoMapping;
+
+ // Now each client trys to listen to demand from others.
+ // If they have message, it processes: pushing global index and corresponding server to its map
+ while (!isFinished || (!sendRequest.empty()))
+ {
+ testSendRequest(sendRequest);
+ probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
+ countIndex, indexBuffBegin,
+ requestRecvIndex, commLevel);
+ // Processing complete request
+ for (itRequestIndex = requestRecvIndex.begin();
+ itRequestIndex != requestRecvIndex.end();
+ ++itRequestIndex)
+ {
+ int rank = itRequestIndex->first;
+ int count = computeBuffCountIndexGlobal(itRequestIndex->second);
+ if (0 != count)
+ countBuffIndex[rank] = count;
+ }
+
+ probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
+ countInfo, infoBuffBegin,
+ requestRecvInfo, commLevel);
+ for (itRequestInfo = requestRecvInfo.begin();
+ itRequestInfo != requestRecvInfo.end();
+ ++itRequestInfo)
+ {
+ int rank = itRequestInfo->first;
+ int count = computeBuffCountIndexServer(itRequestInfo->second);
+ if (0 != count)
+ countBuffInfo[rank] = count;
+ }
+
+ for (std::map::iterator it = countBuffIndex.begin();
+ it != countBuffIndex.end(); ++it)
+ {
+ int rank = it->first;
+ if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
+ (countBuffIndex.end() != countBuffIndex.find(rank)))
+ {
+ int count = it->second;
+ for (int i = 0; i < count; ++i)
+ indexToInfoMapping.insert(std::make_pair(*(indexBuffBegin[rank]+i),*(infoBuffBegin[rank]+i)));
+ processedList.push_back(rank);
+ --recvNbClient;
+ }
+ }
+
+ for (int i = 0; i < processedList.size(); ++i)
+ {
+ requestRecvInfo.erase(processedList[i]);
+ requestRecvIndex.erase(processedList[i]);
+ countBuffIndex.erase(processedList[i]);
+ countBuffInfo.erase(processedList[i]);
+ }
+
+ if (0 == recvNbClient) isFinished = true;
+ }
+
+ delete [] sendBuff;
+ delete [] sendNbIndexBuff;
+ delete [] recvBuff;
+ delete [] recvNbIndexBuff;
+ delete [] recvIndexBuff;
+ delete [] recvInfoBuff;
+
+ // Ok, now do something recursive
+ if (0 < level)
+ {
+ --level;
+ computeDistributedIndex(indexToInfoMapping, commLevel_[level], level);
+ }
+ else
+ globalIndexToServerMapping_ = indexToInfoMapping;
+}
+
+/*!
+ Probe and receive message containg global index from other clients.
+ Each client can send a message of global index to other clients to fulfill their maps.
+Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
+ \param [in] recvIndexBuff buffer dedicated for receiving global index
+ \param [in] recvNbIndexCount size of the buffer
+ \param [in] countIndex number of received index
+ \param [in] indexBuffBegin beginning of index buffer for each source rank
+ \param [in] requestRecvIndex request of receving index
+ \param [in] intraComm communicator
+*/
+void CClientClientDHT::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
+ const int recvNbIndexCount,
+ int& countIndex,
+ std::map& indexBuffBegin,
+ std::map& requestRecvIndex,
+ const MPI_Comm& intraComm)
+{
+ MPI_Status statusIndexGlobal;
+ int flagIndexGlobal, count;
+
+ // Probing for global index
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
+ if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
+ {
+ MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
+ indexBuffBegin.insert(std::make_pair(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
+ MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
+ statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
+ &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
+ countIndex += count;
+ }
+}
+
+/*!
+ Probe and receive message containg server index from other clients.
+ Each client can send a message of server index to other clients to fulfill their maps.
+Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
+ \param [in] recvInfoBuff buffer dedicated for receiving server index
+ \param [in] recvNbIndexCount size of the buffer
+ \param [in] countInfo number of received info
+ \param [in] infoBuffBegin beginning of index buffer for each source rank
+ \param [in] requestRecvInfo request of receving index
+ \param [in] intraComm communicator
+*/
+void CClientClientDHT::probeInfoMessageFromClients(int* recvInfoBuff,
+ const int recvNbIndexCount,
+ int& countInfo,
+ std::map& infoBuffBegin,
+ std::map& requestRecvInfo,
+ const MPI_Comm& intraComm)
+{
+ MPI_Status statusInfo;
+ int flagInfo, count;
+
+ // Probing for server index
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
+ if ((true == flagInfo) && (countInfo < recvNbIndexCount))
+ {
+ MPI_Get_count(&statusInfo, MPI_INT, &count);
+ infoBuffBegin.insert(std::make_pair(statusInfo.MPI_SOURCE, recvInfoBuff+countInfo));
+ MPI_Irecv(recvInfoBuff+countInfo, count, MPI_INT,
+ statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
+ &requestRecvInfo[statusInfo.MPI_SOURCE]);
+
+ countInfo += count;
+ }
+}
+
+/*!
+ Send message containing index to clients
+ \param [in] clientDestRank rank of destination client
+ \param [in] indices index to send
+ \param [in] clientIntraComm communication group of client
+ \param [in] requestSendIndex list of sending request
+*/
+void CClientClientDHT::sendIndexToClients(int clientDestRank, std::vector& indices,
+ const MPI_Comm& clientIntraComm,
+ std::list& requestSendIndex)
+{
+ MPI_Request request;
+ requestSendIndex.push_back(request);
+ MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG,
+ clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
+}
+
+/*!
+ Send message containing information to clients
+ \param [in] clientDestRank rank of destination client
+ \param [in] info server index to send
+ \param [in] clientIntraComm communication group of client
+ \param [in] requestSendInfo list of sending request
+*/
+void CClientClientDHT::sendInfoToClients(int clientDestRank, std::vector& info,
+ const MPI_Comm& clientIntraComm,
+ std::list& requestSendInfo)
+{
+ MPI_Request request;
+ requestSendInfo.push_back(request);
+ MPI_Isend(&(info)[0], (info).size(), MPI_INT,
+ clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
+}
+
+/*!
+ Verify status of sending request
+ \param [in] sendRequest sending request to verify
+*/
+void CClientClientDHT::testSendRequest(std::list& sendRequest)
+{
+ int flag = 0;
+ MPI_Status status;
+ std::list::iterator itRequest;
+ int sizeListRequest = sendRequest.size();
+ int idx = 0;
+ while (idx < sizeListRequest)
+ {
+ bool isErased = false;
+ for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
+ {
+ MPI_Test(&(*itRequest), &flag, &status);
+ if (true == flag)
+ {
+ isErased = true;
+ break;
+ }
+ }
+ if (true == isErased) sendRequest.erase(itRequest);
+ ++idx;
+ }
+}
+
+/*!
+ Process the received request. Pushing global index and server index into map
+ \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
+ \param[in] buffIndexServer pointer to the begining of buffer containing server index
+ \param[in] count size of received message
+*/
+//void CClientClientDHT::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
+//{
+// for (int i = 0; i < count; ++i)
+// globalIndexToServerMapping_.insert(std::make_pair(*(buffIndexGlobal+i),*(buffIndexServer+i)));
+//}
+
+/*!
+ Compute size of message containing global index
+ \param[in] requestRecv request of message
+*/
+int CClientClientDHT::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
+{
+ int flag, count = 0;
+ MPI_Status status;
+
+ MPI_Test(&requestRecv, &flag, &status);
+ if (true == flag)
+ {
+ MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
+ }
+
+ return count;
+}
+
+/*!
+ Compute size of message containing server index
+ \param[in] requestRecv request of message
+*/
+int CClientClientDHT::computeBuffCountIndexServer(MPI_Request& requestRecv)
+{
+ int flag, count = 0;
+ MPI_Status status;
+
+ MPI_Test(&requestRecv, &flag, &status);
+ if (true == flag)
+ {
+ MPI_Get_count(&status, MPI_INT, &count);
+ }
+
+ return count;
+}
+
+}
Index: /XIOS/trunk/src/client_client_dht.hpp
===================================================================
--- /XIOS/trunk/src/client_client_dht.hpp (revision 720)
+++ /XIOS/trunk/src/client_client_dht.hpp (revision 720)
@@ -0,0 +1,134 @@
+/*!
+ \file client_client_dht.hpp
+ \author Ha NGUYEN
+ \since 15 Sep 2015
+ \date 29 Sep 2015
+
+ \brief Distributed hashed table implementation.
+ */
+
+#ifndef __XIOS_CLIENT_CLIENT_DHT_HPP__
+#define __XIOS_CLIENT_CLIENT_DHT_HPP__
+
+#include "xios_spl.hpp"
+#include "array_new.hpp"
+#include "mpi.hpp"
+#include
+
+namespace xios
+{
+/*!
+ \class CClientClientDHT
+ This class provides the similar features like \class CClientServerMappingDistributed, which implements a simple distributed hashed table;
+Moreover, by extending with hierarchical structure, it allows to reduce the number of communication among processes greatly.
+*/
+class CClientClientDHT
+{
+ public:
+ /** Default constructor */
+ CClientClientDHT(const boost::unordered_map& globalIndexOfServer,
+ const MPI_Comm& clientIntraComm, bool isDataDistributed = true,
+ int hierarLvl = 2);
+
+ void computeServerIndexMapping(const CArray& globalIndexOnClientSendToServer);
+
+ const std::map >& getGlobalIndexOnServer() const {return indexGlobalOnServer_; }
+ const boost::unordered_map& getGlobalIndexServerMapping() const {return globalIndexToServerMapping_; }
+
+ /** Default destructor */
+ virtual ~CClientClientDHT();
+
+ protected:
+ // Redistribute global index and server index among clients
+ void computeDistributedIndex(const boost::unordered_map& globalIndexOfServer,
+ const MPI_Comm& intraCommLevel,
+ int level);
+
+ void computeMPICommLevel(const MPI_Comm& mpiCommRoot);
+
+ void divideMPICommLevel(const MPI_Comm& mpiCommLevel, int level);
+
+ void computeHashIndex(std::vector& indexClientHash, int nbClient);
+
+ virtual void computeIndexMapping(const CArray& globalIndexOnClientSendToServer,
+ const MPI_Comm& intraCommLevel,
+ int level);
+
+ protected:
+ void probeIndexMessageFromClients(unsigned long* recvIndexGlobalBuff,
+ const int recvNbIndexCount,
+ int& countIndexGlobal,
+ std::map& indexGlobalBuffBegin,
+ std::map& requestRecvIndexGlobal,
+ const MPI_Comm& intraComm);
+
+ void probeInfoMessageFromClients(int* recvIndexServerBuff,
+ const int recvNbIndexCount,
+ int& countIndexServer,
+ std::map& indexServerBuffBegin,
+ std::map& requestRecvIndexServer,
+ const MPI_Comm& intraComm);
+
+ // Send server index to clients
+ void sendInfoToClients(int clientDestRank, std::vector& indexServer,
+ const MPI_Comm& clientIntraComm, std::list& requestSendIndexServer);
+
+ // Send global index to clients
+ void sendIndexToClients(int clientDestRank, std::vector& indexGlobal,
+ const MPI_Comm& clientIntraComm, std::list& requestSendIndexGlobal);
+
+ // Verify sending request
+ void testSendRequest(std::list& sendRequest);
+
+ // Compute size of receiving buffer for global index
+ int computeBuffCountIndexGlobal(MPI_Request& requestRecv);
+
+ // Compute size of receiving buffer for server index
+ int computeBuffCountIndexServer(MPI_Request& requestRecv);
+
+ protected:
+ //! Mapping of global index to the corresponding client
+ boost::unordered_map globalIndexToServerMapping_;
+
+ //! A temporary mapping of index to the corresponding information in each level of hierarchy
+ boost::unordered_map globalIndexToInfoMappingLevel_;
+ std::vector commLevel_;
+
+ int nbLevel_;
+
+ //! Global index of data on SERVER, which are calculated by client(s)
+ std::map > indexGlobalOnServer_;
+
+// //! Number of client
+// int nbClient_;
+//
+// //! Rank of client
+// int clientRank_;
+
+// //! Counting of buffer for receiving global index
+// int countIndexGlobal_;
+//
+// //! Counting of buffer for receiving server index
+// int countIndexServer_;
+
+ //! intracommuntion of clients
+ MPI_Comm intraCommRoot_;
+
+// //! Request returned by MPI_IRecv function about global index
+// std::map requestRecvIndexGlobal_;
+//
+// //! Request returned by MPI_IRecv function about index of server
+// std::map requestRecvIndexServer_;
+//
+// //! Mapping client rank and the beginning position of receiving buffer for message of global index from this client
+// std::map indexGlobalBuffBegin_;
+//
+// //! Mapping client rank and the begining position of receiving buffer for message of server index from this client
+// std::map indexServerBuffBegin_;
+
+ //! Flag to specify whether data is distributed or not
+ bool isDataDistributed_;
+};
+
+} // namespace xios
+#endif // __XIOS_CLIENT_CLIENT_DHT_HPP__
Index: /XIOS/trunk/src/client_server_mapping_distributed.cpp
===================================================================
--- /XIOS/trunk/src/client_server_mapping_distributed.cpp (revision 719)
+++ /XIOS/trunk/src/client_server_mapping_distributed.cpp (revision 720)
@@ -12,4 +12,6 @@
#include
#include "utils.hpp"
+#include "client_client_dht.hpp"
+#include "mpi_tag.hpp"
namespace xios
@@ -19,5 +21,6 @@
const MPI_Comm& clientIntraComm, bool isDataDistributed)
: CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0),
- indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed)
+ indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed),
+ ccDHT_(0)
{
clientIntraComm_ = clientIntraComm;
@@ -25,9 +28,19 @@
MPI_Comm_rank(clientIntraComm,&clientRank_);
computeHashIndex();
- computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
+
+ ccDHT_ = new CClientClientDHT(globalIndexOfServer,
+ clientIntraComm,
+ isDataDistributed);
+// const boost::unordered_map& globalIndexToServerMappingTmp = clientDht.getGlobalIndexServerMapping();
+// globalIndexToServerMapping_ = clientDht.getGlobalIndexServerMapping();
+
+
+
+// computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
}
CClientServerMappingDistributed::~CClientServerMappingDistributed()
{
+ if (0 != ccDHT_) delete ccDHT_;
}
@@ -38,4 +51,8 @@
void CClientServerMappingDistributed::computeServerIndexMapping(const CArray& globalIndexOnClient)
{
+ ccDHT_->computeServerIndexMapping(globalIndexOnClient);
+ indexGlobalOnServer_ = ccDHT_->getGlobalIndexOnServer();
+
+/*
size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
@@ -177,4 +194,5 @@
delete [] sendBuff;
delete [] recvBuff;
+*/
}
@@ -348,5 +366,5 @@
// Probing for global index
- MPI_Iprobe(MPI_ANY_SOURCE, 15, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
{
@@ -354,5 +372,5 @@
indexGlobalBuffBegin_.insert(std::make_pair(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
- statusIndexGlobal.MPI_SOURCE, 15, clientIntraComm_,
+ statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_,
&requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
countIndexGlobal_ += count;
@@ -373,5 +391,5 @@
// Probing for server index
- MPI_Iprobe(MPI_ANY_SOURCE, 12, clientIntraComm_, &flagIndexServer, &statusIndexServer);
+ MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO_0, clientIntraComm_, &flagIndexServer, &statusIndexServer);
if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
{
@@ -379,5 +397,5 @@
indexServerBuffBegin_.insert(std::make_pair(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
- statusIndexServer.MPI_SOURCE, 12, clientIntraComm_,
+ statusIndexServer.MPI_SOURCE, MPI_DHT_INFO_0, clientIntraComm_,
&requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
@@ -400,5 +418,5 @@
requestSendIndexGlobal.push_back(request);
MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
- clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back()));
+ clientDestRank, MPI_DHT_INDEX_0, clientIntraComm, &(requestSendIndexGlobal.back()));
}
@@ -417,5 +435,5 @@
requestSendIndexServer.push_back(request);
MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
- clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back()));
+ clientDestRank, MPI_DHT_INFO_0, clientIntraComm, &(requestSendIndexServer.back()));
}
Index: /XIOS/trunk/src/client_server_mapping_distributed.hpp
===================================================================
--- /XIOS/trunk/src/client_server_mapping_distributed.hpp (revision 719)
+++ /XIOS/trunk/src/client_server_mapping_distributed.hpp (revision 720)
@@ -17,4 +17,5 @@
#include "mpi.hpp"
#include
+#include "client_client_dht.hpp"
namespace xios
@@ -118,4 +119,7 @@
//! Flag to specify whether data is distributed or not
bool isDataDistributed_;
+
+
+ CClientClientDHT* ccDHT_;
};
Index: /XIOS/trunk/src/mpi_tag.hpp
===================================================================
--- /XIOS/trunk/src/mpi_tag.hpp (revision 720)
+++ /XIOS/trunk/src/mpi_tag.hpp (revision 720)
@@ -0,0 +1,16 @@
+#ifndef __XIOS_MPI_TAG_HPP__
+#define __XIOS_MPI_TAG_HPP__
+
+/* Tag for mpi communication to send and receive index in distributed hashed table version 2*/
+#define MPI_DHT_INDEX 15
+
+/* Tag for mpi communication to send and receive info in distributed hashed table version 2*/
+#define MPI_DHT_INFO 12
+
+/* Tag for mpi communication to send and receive index in distributed hashed table version 1*/
+#define MPI_DHT_INDEX_0 25
+
+/* Tag for mpi communication to send and receive info in distributed hashed table version 1*/
+#define MPI_DHT_INFO_0 22
+
+#endif
Index: /XIOS/trunk/src/node/domain.cpp
===================================================================
--- /XIOS/trunk/src/node/domain.cpp (revision 719)
+++ /XIOS/trunk/src/node/domain.cpp (revision 720)
@@ -272,6 +272,4 @@
fillInRectilinearLonLat();
this->isRedistributed_ = true;
- info <<"now, we are here " << std::endl;
- info << "domain " << this->getId() << " ni " << ni.getValue() << " nj " << nj.getValue() << std::endl;
}
}
Index: /XIOS/trunk/src/node/grid.cpp
===================================================================
--- /XIOS/trunk/src/node/grid.cpp (revision 719)
+++ /XIOS/trunk/src/node/grid.cpp (revision 720)
@@ -18,4 +18,5 @@
#include "grid_transformation.hpp"
#include "grid_generate.hpp"
+#include "client_client_dht.hpp"
namespace xios {
@@ -158,5 +159,5 @@
/*!
* Test whether the data defined on the grid can be outputted in a compressed way.
- *
+ *
* \return true if and only if a mask was defined for this grid
*/
@@ -395,6 +396,30 @@
clientDistribution_->isDataDistributed());
+ CClientClientDHT clientDht(serverDistributionDescription.getGlobalIndexRange(),
+ client->intraComm,
+ clientDistribution_->isDataDistributed());
+ clientDht.computeServerIndexMapping(clientDistribution_->getGlobalIndex());
+ const std::map >& globalIndexOnServer0 = clientDht.getGlobalIndexOnServer();
+
+ std::map >::const_iterator itbTmp, itTmp, iteTmp;
+ itbTmp = globalIndexOnServer0.begin(); iteTmp = globalIndexOnServer0.end();
+ for (itTmp = itbTmp; itTmp != iteTmp; ++itTmp)
+ {
+ const std::vector& tmpVec = itTmp->second; info << "tmpVec0. Rank " << itTmp->first << ". Size = " << tmpVec.size() << ". " ;
+ for (int i = 0; i < tmpVec.size(); ++i) info << tmpVec[i] << " ";
+ info << std::endl;
+ }
+//
clientServerMap_->computeServerIndexMapping(clientDistribution_->getGlobalIndex());
const std::map >& globalIndexOnServer = clientServerMap_->getGlobalIndexOnServer();
+
+ itbTmp = globalIndexOnServer.begin(); iteTmp = globalIndexOnServer.end();
+ for (itTmp = itbTmp; itTmp != iteTmp; ++itTmp)
+ {
+ const std::vector& tmpVec = itTmp->second; info << "tmpVec1. Rank " << itTmp->first << ". Size = " << tmpVec.size() << ". " ;
+ for (int i = 0; i < tmpVec.size(); ++i) info << tmpVec[i] << " ";
+ info << std::endl;
+ }
+
const std::vector& globalIndexSendToServer = clientDistribution_->getGlobalDataIndexSendToServer();
std::map >::const_iterator iteGlobalMap, itbGlobalMap, itGlobalMap;
Index: /XIOS/trunk/src/test/test_client.f90
===================================================================
--- /XIOS/trunk/src/test/test_client.f90 (revision 719)
+++ /XIOS/trunk/src/test/test_client.f90 (revision 720)
@@ -15,6 +15,6 @@
CHARACTER(len=15) :: calendar_type
TYPE(xios_context) :: ctx_hdl
- INTEGER,PARAMETER :: ni_glo=100
- INTEGER,PARAMETER :: nj_glo=100
+ INTEGER,PARAMETER :: ni_glo=10
+ INTEGER,PARAMETER :: nj_glo=10
INTEGER,PARAMETER :: llm=5
DOUBLE PRECISION :: lval(llm)=1