Ignore:
Timestamp:
04/08/16 15:00:15 (8 years ago)
Author:
mhnguyen
Message:

Improvements for dht

+) Implement adaptive hierarchy for dht, level of hierarchy depends on number of processes
+) Remove some redundant codes

Test
+) On Curie
+) All tests are correct

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/client_client_dht_template_impl.hpp

    r832 r833  
    2525                                                        const MPI_Comm& clientIntraComm, 
    2626                                                        int hierarLvl) 
    27   : index2InfoMapping_(), indexToInfoMappingLevel_() 
    28 { 
    29   this->computeMPICommLevel(clientIntraComm, hierarLvl); 
    30   int lvl = this->commLevel_.size() - 1; 
    31   computeDistributedIndex(indexInfoMap, this->commLevel_[lvl], lvl); 
     27  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_() 
     28{ 
     29  this->computeMPICommLevel(); 
     30  int nbLvl = this->getNbLevel(); 
     31  sendRank_.resize(nbLvl); 
     32  recvRank_.resize(nbLvl); 
     33  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1); 
    3234} 
    3335 
     
    4446void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices) 
    4547{ 
    46   int lvl = this->commLevel_.size() - 1; 
    47   computeIndexInfoMappingLevel(indices, this->commLevel_[lvl], lvl); 
     48  int nbLvl = this->getNbLevel(); 
     49  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1); 
    4850} 
    4951 
     
    6062                                                                 int level) 
    6163{ 
    62   int nbClient, clientRank; 
    63   MPI_Comm_size(commLevel,&nbClient); 
     64  int clientRank; 
    6465  MPI_Comm_rank(commLevel,&clientRank); 
     66  int groupRankBegin = this->getGroupBegin()[level]; 
     67  int nbClient = this->getNbInGroup()[level]; 
    6568  std::vector<size_t> hashedIndex; 
    6669  computeHashIndex(hashedIndex, nbClient); 
     
    7073  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash, 
    7174                                      iteClientHash = hashedIndex.end(); 
    72   std::map<int, std::vector<size_t> > client2ClientIndex; 
     75  std::vector<int> sendBuff(nbClient,0); 
     76  std::vector<int> sendNbIndexBuff(nbClient,0); 
    7377 
    7478  // Number of global index whose mapping server are on other clients 
    7579  int nbIndexToSend = 0; 
     80  size_t index; 
    7681  HashXIOS<size_t> hashGlobalIndex; 
    7782  for (int i = 0; i < ssize; ++i) 
    7883  { 
    79     size_t index = indices(i); 
     84    index = indices(i); 
    8085    hashedVal  = hashGlobalIndex(index); 
    8186    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal); 
    82     if (iteClientHash != itClientHash) 
     87    int indexClient = std::distance(itbClientHash, itClientHash)-1; 
     88    ++sendNbIndexBuff[indexClient]; 
     89  } 
     90 
     91  std::map<int, size_t* > client2ClientIndex; 
     92  for (int idx = 0; idx < nbClient; ++idx) 
     93  { 
     94    if (0 != sendNbIndexBuff[idx]) 
     95    { 
     96      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]]; 
     97      nbIndexToSend += sendNbIndexBuff[idx]; 
     98      sendBuff[idx] = 1; 
     99      sendNbIndexBuff[idx] = 0; 
     100    } 
     101  } 
     102 
     103  for (int i = 0; i < ssize; ++i) 
     104  { 
     105    index = indices(i); 
     106    hashedVal  = hashGlobalIndex(index); 
     107    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal); 
    83108    { 
    84109      int indexClient = std::distance(itbClientHash, itClientHash)-1; 
    85110      { 
    86         client2ClientIndex[indexClient].push_back(index); 
    87         ++nbIndexToSend; 
     111        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;; 
     112        ++sendNbIndexBuff[indexClient]; 
    88113      } 
    89114    } 
    90115  } 
    91116 
    92   int* sendBuff = new int[nbClient]; 
    93   for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0; 
    94   std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndex.begin(), it, 
    95                                                 ite  = client2ClientIndex.end(); 
    96   for (it = itb; it != ite; ++it) sendBuff[it->first] = 1; 
    97   int* recvBuff = new int[nbClient]; 
    98   MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); 
     117  int recvNbClient, recvNbIndexCount; 
     118  sendRecvRank(level, sendBuff, sendNbIndexBuff, 
     119               recvNbClient, recvNbIndexCount); 
     120 
     121  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 
     122                                                iteIndex = client2ClientIndex.end(); 
    99123 
    100124  std::list<MPI_Request> sendIndexRequest; 
    101   if (0 != nbIndexToSend) 
    102       for (it = itb; it != ite; ++it) 
    103          sendIndexToClients(it->first, it->second, commLevel, sendIndexRequest); 
    104  
    105   int nbDemandingClient = recvBuff[clientRank], nbSendBuffInfoReceived = 0; 
     125  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 
     126     sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendIndexRequest); 
     127 
     128  int nbDemandingClient = recvNbClient; //recvBuff[clientRank], 
     129  int nbSendBuffInfoReceived = 0; 
    106130 
    107131  // Receiving demand as well as the responds from other clients 
     
    109133  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s) 
    110134  // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer 
    111   for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size(); 
    112   MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); 
    113  
    114135  unsigned long* recvBuffIndex = 0; 
    115   int maxNbIndexDemandedFromOthers = recvBuff[clientRank]; 
    116  
     136  int maxNbIndexDemandedFromOthers = recvNbIndexCount; 
    117137  if (0 != maxNbIndexDemandedFromOthers) 
    118138    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers]; 
    119139 
    120140  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients 
    121 //  InfoType* recvBuffInfo = 0; 
    122141  unsigned char* recvBuffInfo = 0; 
    123142  int nbIndexReceivedFromOthers = nbIndexToSend; 
     
    181200  { 
    182201    --level; 
    183     computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->commLevel_[level], level); 
     202    computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->internalComm_, level); 
    184203  } 
    185204  else 
     
    234253        int clientSourceRank = statusInfo.MPI_SOURCE; 
    235254        unsigned char* beginBuff = infoBuffBegin[clientSourceRank]; 
    236         std::vector<size_t>& indexTmp = client2ClientIndex[clientSourceRank]; 
     255        size_t* indexTmp = client2ClientIndex[clientSourceRank]; 
    237256        int infoIndex = 0; 
    238257        for (int i = 0; i < actualCountInfo; ++i) 
     
    253272  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex; 
    254273  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo; 
     274  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 
    255275  for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx]; 
    256   delete [] sendBuff; 
    257   delete [] recvBuff; 
    258276} 
    259277 
     
    292310                                                            int level) 
    293311{ 
    294   int nbClient, clientRank; 
    295   MPI_Comm_size(commLevel,&nbClient); 
     312  int clientRank; 
    296313  MPI_Comm_rank(commLevel,&clientRank); 
     314  computeSendRecvRank(level, clientRank); 
     315 
     316  int groupRankBegin = this->getGroupBegin()[level]; 
     317  int nbClient = this->getNbInGroup()[level]; 
    297318  std::vector<size_t> hashedIndex; 
    298319  computeHashIndex(hashedIndex, nbClient); 
    299320 
    300   int* sendBuff = new int[nbClient]; 
    301   int* sendNbIndexBuff = new int[nbClient]; 
    302   for (int i = 0; i < nbClient; ++i) 
    303   { 
    304     sendBuff[i] = 0; sendNbIndexBuff[i] = 0; 
    305   } 
    306  
    307   // Compute size of sending and receving buffer 
    308   std::map<int, std::vector<size_t> > client2ClientIndex; 
    309   std::map<int, std::vector<InfoType> > client2ClientInfo; 
    310  
     321  std::vector<int> sendBuff(nbClient,0); 
     322  std::vector<int> sendNbIndexBuff(nbClient,0); 
    311323  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash, 
    312324                                      iteClientHash = hashedIndex.end(); 
    313   typename boost::unordered_map<size_t,InfoType>::const_iterator it  = indexInfoMap.begin(), 
     325  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it, 
    314326                                                                 ite = indexInfoMap.end(); 
    315327  HashXIOS<size_t> hashGlobalIndex; 
    316   for (; it != ite; ++it) 
     328 
     329  // Compute size of sending and receving buffer 
     330  for (it = itb; it != ite; ++it) 
    317331  { 
    318332    size_t hashIndex = hashGlobalIndex(it->first); 
    319333    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex); 
    320     if (itClientHash != iteClientHash) 
    321334    { 
    322335      int indexClient = std::distance(itbClientHash, itClientHash)-1; 
    323336      { 
    324         sendBuff[indexClient] = 1; 
    325337        ++sendNbIndexBuff[indexClient]; 
    326         client2ClientIndex[indexClient].push_back(it->first); 
    327         client2ClientInfo[indexClient].push_back(it->second); 
    328338      } 
    329339    } 
    330340  } 
    331341 
     342  std::map<int, size_t*> client2ClientIndex; 
     343  std::map<int, unsigned char*> client2ClientInfo; 
     344  for (int idx = 0; idx < nbClient; ++idx) 
     345  { 
     346    if (0 != sendNbIndexBuff[idx]) 
     347    { 
     348      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]]; 
     349      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()]; 
     350      sendNbIndexBuff[idx] = 0; 
     351      sendBuff[idx] = 1; 
     352    } 
     353  } 
     354 
     355  std::vector<int> sendNbInfo(nbClient,0); 
     356  for (it = itb; it != ite; ++it) 
     357  { 
     358    size_t hashIndex = hashGlobalIndex(it->first); 
     359    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex); 
     360    { 
     361      int indexClient = std::distance(itbClientHash, itClientHash)-1; 
     362      { 
     363        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;; 
     364        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]); 
     365        ++sendNbIndexBuff[indexClient]; 
     366      } 
     367    } 
     368  } 
     369 
    332370  // Calculate from how many clients each client receive message. 
    333   int* recvBuff = new int[nbClient]; 
    334   MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel); 
    335   int recvNbClient = recvBuff[clientRank]; 
    336  
    337371  // Calculate size of buffer for receiving message 
    338   int* recvNbIndexBuff = new int[nbClient]; 
    339   MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel); 
    340   int recvNbIndexCount = recvNbIndexBuff[clientRank]; 
    341   unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; 
    342   unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 
     372  int recvNbClient, recvNbIndexCount; 
     373  sendRecvRank(level, sendBuff, sendNbIndexBuff, 
     374               recvNbClient, recvNbIndexCount); 
    343375 
    344376  // If a client holds information about index and the corresponding which don't belong to it, 
     
    346378  // Contents of the message are index and its corresponding informatioin 
    347379  std::list<MPI_Request> sendRequest; 
    348   std::map<int, std::vector<size_t> >::iterator itIndex  = client2ClientIndex.begin(), 
    349                                                 iteIndex = client2ClientIndex.end(); 
    350   for (; itIndex != iteIndex; ++itIndex) 
    351     sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest); 
    352   typename std::map<int, std::vector<InfoType> >::iterator itbInfo = client2ClientInfo.begin(), itInfo, 
    353                                                            iteInfo = client2ClientInfo.end(); 
    354  
    355   std::vector<int> infoSizeToSend(client2ClientInfo.size(),0); 
    356   std::vector<unsigned char*> infoToSend(client2ClientInfo.size()); 
    357   itInfo = itbInfo; 
    358   for (int idx = 0; itInfo != iteInfo; ++itInfo, ++idx) 
    359   { 
    360     const std::vector<InfoType>& infoVec = itInfo->second; 
    361     int infoVecSize = infoVec.size(); 
    362     std::vector<int> infoIndex(infoVecSize); 
    363     for (int i = 0; i < infoVecSize; ++i) 
    364     { 
    365       infoIndex[i] = infoSizeToSend[idx]; 
    366       ProcessDHTElement<InfoType>::packElement(infoVec[i], NULL, infoSizeToSend[idx]); 
    367     } 
    368  
    369     infoToSend[idx] = new unsigned char[infoSizeToSend[idx]]; 
    370     infoSizeToSend[idx] = 0; 
    371     for (int i = 0; i < infoVecSize; ++i) 
    372     { 
    373       ProcessDHTElement<InfoType>::packElement(infoVec[i], infoToSend[idx], infoSizeToSend[idx]); 
    374     } 
    375  
    376     sendInfoToClients(itInfo->first, infoToSend[idx], infoSizeToSend[idx], commLevel, sendRequest); 
    377   } 
    378  
     380  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 
     381                                    iteIndex = client2ClientIndex.end(); 
     382  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 
     383    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendRequest); 
     384  std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo, 
     385                                          iteInfo = client2ClientInfo.end(); 
     386  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) 
     387    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, sendRequest); 
     388 
     389 
     390  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; 
     391  unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 
    379392 
    380393  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo; 
     
    465478  } 
    466479 
    467   for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx]; 
    468   delete [] sendBuff; 
    469   delete [] sendNbIndexBuff; 
    470   delete [] recvBuff; 
    471   delete [] recvNbIndexBuff; 
     480  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 
     481  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) delete [] itInfo->second; 
    472482  delete [] recvIndexBuff; 
    473483  delete [] recvInfoBuff; 
     
    477487  { 
    478488    --level; 
    479     computeDistributedIndex(indexToInfoMapping, this->commLevel_[level], level); 
     489    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level); 
    480490  } 
    481491  else 
     
    563573*/ 
    564574template<typename T, typename H> 
    565 void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, std::vector<size_t>& indices, 
     575void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize, 
    566576                                                       const MPI_Comm& clientIntraComm, 
    567577                                                       std::list<MPI_Request>& requestSendIndex) 
     
    569579  MPI_Request request; 
    570580  requestSendIndex.push_back(request); 
    571   MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG, 
     581  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG, 
    572582            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back())); 
    573583} 
     
    658668} 
    659669 
    660 } 
     670/*! 
     671  Compute how many processes one process needs to send to and from how many processes it will receive 
     672*/ 
     673template<typename T, typename H> 
     674void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank) 
     675{ 
     676  int groupBegin = this->getGroupBegin()[level]; 
     677  int nbInGroup  = this->getNbInGroup()[level]; 
     678  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level]; 
     679  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level]; 
     680 
     681  std::vector<size_t> hashedIndexGroup; 
     682  computeHashIndex(hashedIndexGroup, nbInGroup); 
     683  size_t a = hashedIndexGroup[rank-groupBegin]; 
     684  size_t b = hashedIndexGroup[rank-groupBegin+1]-1; 
     685 
     686  int currentGroup, offset; 
     687  size_t e,f; 
     688 
     689  // Do a simple math [a,b) intersect [c,d) 
     690  for (int idx = 0; idx < groupParentBegin.size(); ++idx) 
     691  { 
     692    std::vector<size_t> hashedIndexGroupParent; 
     693    int nbInGroupParent = nbInGroupParents[idx]; 
     694    if (0 != nbInGroupParent) 
     695      computeHashIndex(hashedIndexGroupParent, nbInGroupParent); 
     696    for (int i = 0; i < nbInGroupParent; ++i) 
     697    { 
     698      size_t c = hashedIndexGroupParent[i]; 
     699      size_t d = hashedIndexGroupParent[i+1]-1; 
     700 
     701    if (!((d < a) || (b <c))) 
     702        recvRank_[level].push_back(groupParentBegin[idx]+i); 
     703    } 
     704 
     705    offset = rank - groupParentBegin[idx]; 
     706    if ((offset<nbInGroupParents[idx]) && (0 <= offset)) 
     707    { 
     708      e = hashedIndexGroupParent[offset]; 
     709      f = hashedIndexGroupParent[offset+1]-1; 
     710    } 
     711  } 
     712 
     713  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup, 
     714                                      iteHashGroup = hashedIndexGroup.end(); 
     715  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1); 
     716  int begin = std::distance(itbHashGroup, itHashGroup)-1; 
     717  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f); 
     718  int end = std::distance(itbHashGroup, itHashGroup) -1; 
     719  sendRank_[level].resize(end-begin+1); 
     720  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin; 
     721} 
     722 
     723/*! 
     724  Send and receive number of process each process need to listen to as well as number 
     725  of index it will receive 
     726*/ 
     727template<typename T, typename H> 
     728void CClientClientDHTTemplate<T,H>::sendRecvRank(int level, 
     729                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, 
     730                                                 int& recvNbRank, int& recvNbElements) 
     731{ 
     732  int groupBegin = this->getGroupBegin()[level]; 
     733 
     734  int offSet = 0; 
     735  std::vector<int>& sendRank = sendRank_[level]; 
     736  std::vector<int>& recvRank = recvRank_[level]; 
     737  int sendBuffSize = sendRank.size(); 
     738  int* sendBuff = new int [sendBuffSize*2]; 
     739  std::vector<MPI_Request> request(sendBuffSize); 
     740  std::vector<MPI_Status> requestStatus(sendBuffSize); 
     741  int recvBuffSize = recvRank.size(); 
     742  int* recvBuff = new int [2]; 
     743 
     744  for (int idx = 0; idx < sendBuffSize; ++idx) 
     745  { 
     746    offSet = sendRank[idx]-groupBegin; 
     747    sendBuff[idx*2] = sendNbRank[offSet]; 
     748    sendBuff[idx*2+1] = sendNbElements[offSet]; 
     749  } 
     750 
     751  for (int idx = 0; idx < sendBuffSize; ++idx) 
     752  { 
     753    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT, 
     754              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[idx]); 
     755  } 
     756 
     757  MPI_Status status; 
     758  int nbRecvRank = 0, nbRecvElements = 0; 
     759  for (int idx = 0; idx < recvBuffSize; ++idx) 
     760  { 
     761    MPI_Recv(recvBuff, 2, MPI_INT, 
     762             recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &status); 
     763    nbRecvRank += *(recvBuff); 
     764    nbRecvElements += *(recvBuff+1); 
     765  } 
     766 
     767  MPI_Waitall(sendBuffSize, &request[0], &requestStatus[0]); 
     768 
     769  recvNbRank = nbRecvRank; 
     770  recvNbElements = nbRecvElements; 
     771 
     772  delete [] sendBuff; 
     773  delete [] recvBuff; 
     774} 
     775 
     776} 
Note: See TracChangeset for help on using the changeset viewer.