Changeset 839


Ignore:
Timestamp:
04/15/16 14:27:42 (8 years ago)
Author:
mhnguyen
Message:

Correcting a bug in dht

+) The exchange message (MPI_Isend,MPI_Irecv) must be finished in each level
+) If there are no corresponding information found, dht will return a empty array.
+) Remove some redundant codes

Test
+) On Curie
+) Up to 40 cores (3 levels)
+) All tests pass

Location:
XIOS/trunk/src
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/client_client_dht_template.hpp

    r835 r839  
    33   \author Ha NGUYEN 
    44   \since 01 Oct 2015 
    5    \date 06 Oct 2015 
     5   \date 15 April 2016 
    66 
    77   \brief Distributed hashed table implementation. 
     
    6464    void sendRecvRank(int level, 
    6565                      const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, 
    66                       int& recvNbRank, int& recvNbElements); 
     66                      std::vector<int>& recvNbRank, std::vector<int>& recvNbElements); 
    6767 
    6868  protected: 
    69     void probeIndexMessageFromClients(unsigned long* recvIndexGlobalBuff, 
    70                                       const int recvNbIndexCount, 
    71                                       int& countIndexGlobal, 
    72                                       std::map<int, unsigned long*>& indexGlobalBuffBegin, 
    73                                       std::map<int, MPI_Request>& requestRecvIndexGlobal, 
    74                                       const MPI_Comm& intraComm); 
    75  
    76     void probeInfoMessageFromClients(unsigned char* recvIndexServerBuff, 
    77                                      const int recvNbIndexCount, 
    78                                      int& countIndexServer, 
    79                                      std::map<int, unsigned char*>& infoBuffBegin, 
    80                                      std::map<int, MPI_Request>& requestRecvIndexServer, 
    81                                      const MPI_Comm& intraComm); 
    82  
    8369    // Send information to clients 
    8470    void sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize, 
    85                            const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendIndexServer); 
     71                           const MPI_Comm& clientIntraComm, 
     72                           std::vector<MPI_Request>& requestSendInfo); 
     73 
     74    void recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize, 
     75                            const MPI_Comm& clientIntraComm, 
     76                            std::vector<MPI_Request>& requestRecvInfo); 
    8677 
    8778    // Send global index to clients 
    8879    void sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize, 
    89                             const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendIndexGlobal); 
     80                            const MPI_Comm& clientIntraComm, 
     81                            std::vector<MPI_Request>& requestSendIndexGlobal); 
    9082 
    91     // Verify sending request 
    92     void testSendRequest(std::list<MPI_Request>& sendRequest); 
     83    void recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize, 
     84                             const MPI_Comm& clientIntraComm, 
     85                             std::vector<MPI_Request>& requestRecvIndex); 
    9386 
    94     // Compute size of receiving buffer for global index 
    95     int computeBuffCountIndex(MPI_Request& requestRecv); 
    96  
    97     // Compute size of receiving buffer for server index 
    98     int computeBuffCountInfo(MPI_Request& requestRecv); 
     87    void sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements, 
     88                          const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements); 
    9989 
    10090  protected: 
  • XIOS/trunk/src/client_client_dht_template_impl.hpp

    r835 r839  
    108108      int indexClient = std::distance(itbClientHash, itClientHash)-1; 
    109109      { 
    110         client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;; 
     110        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index; 
    111111        ++sendNbIndexBuff[indexClient]; 
    112112      } 
     
    114114  } 
    115115 
    116   int recvNbClient, recvNbIndexCount; 
     116  std::vector<int> recvRankClient, recvNbIndexClientCount; 
    117117  sendRecvRank(level, sendBuff, sendNbIndexBuff, 
    118                recvNbClient, recvNbIndexCount); 
     118               recvRankClient, recvNbIndexClientCount); 
     119 
     120  int recvNbIndexCount = 0; 
     121  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx) 
     122    recvNbIndexCount += recvNbIndexClientCount[idx]; 
     123 
     124  unsigned long* recvIndexBuff; 
     125  if (0 != recvNbIndexCount) 
     126    recvIndexBuff = new unsigned long[recvNbIndexCount]; 
     127 
     128  std::vector<MPI_Request> request; 
     129  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex, 
     130                             iteRecvIndex = recvRankClient.end(), 
     131                           itbRecvNbIndex = recvNbIndexClientCount.begin(), 
     132                           itRecvNbIndex; 
     133  int currentIndex = 0; 
     134  int nbRecvClient = recvRankClient.size(); 
     135  for (int idx = 0; idx < nbRecvClient; ++idx) 
     136  { 
     137    if (0 != recvNbIndexClientCount[idx]) 
     138      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 
     139    currentIndex += recvNbIndexClientCount[idx]; 
     140  } 
    119141 
    120142  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 
    121                                                 iteIndex = client2ClientIndex.end(); 
    122  
    123   std::list<MPI_Request> sendIndexRequest; 
     143                                    iteIndex = client2ClientIndex.end(); 
    124144  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 
    125      sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendIndexRequest); 
    126  
    127   int nbDemandingClient = recvNbClient; //recvBuff[clientRank], 
    128   int nbSendBuffInfoReceived = 0; 
    129  
    130   // Receiving demand as well as the responds from other clients 
    131   // The demand message contains global index; meanwhile the responds have server index information 
    132   // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s) 
    133   // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer 
    134   unsigned long* recvBuffIndex = 0; 
    135   int maxNbIndexDemandedFromOthers = recvNbIndexCount; 
    136   if (0 != maxNbIndexDemandedFromOthers) 
    137     recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers]; 
    138  
    139   // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients 
    140   unsigned char* recvBuffInfo = 0; 
    141   int nbIndexReceivedFromOthers = nbIndexToSend; 
    142   if (0 != nbIndexReceivedFromOthers) 
    143     recvBuffInfo = new unsigned char[nbIndexReceivedFromOthers*ProcessDHTElement<InfoType>::typeSize()]; 
    144  
    145   std::map<int, MPI_Request>::iterator itRequest; 
    146   std::vector<int> demandAlreadyReceived, repondAlreadyReceived; 
    147  
    148   int countIndex = 0;  // Counting of buffer for receiving index 
    149   std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index 
    150  
    151   // Mapping client rank and the beginning position of receiving buffer for message of index from this client 
    152   std::map<int, unsigned long*> indexBuffBegin; 
    153  
    154   std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level 
    155  
    156   CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers); 
    157  
    158   int k = 0; 
    159   while ((0 < nbDemandingClient) || (!sendIndexRequest.empty())) 
    160   { 
    161     // Just check whether a client has any demand from other clients. 
    162     // If it has, then it should send responds to these client(s) 
    163     probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers, 
    164                                  countIndex, indexBuffBegin, 
    165                                  requestRecvIndex, commLevel); 
    166     if (0 < nbDemandingClient) 
    167     { 
    168       for (itRequest = requestRecvIndex.begin(); 
    169            itRequest != requestRecvIndex.end(); ++itRequest) 
     145    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 
     146 
     147  std::vector<MPI_Status> status(request.size()); 
     148  MPI_Waitall(request.size(), &request[0], &status[0]); 
     149 
     150  CArray<size_t,1>* tmpGlobalIndex; 
     151  if (0 != recvNbIndexCount) 
     152    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData); 
     153  else 
     154    tmpGlobalIndex = new CArray<size_t,1>(); 
     155 
     156  // OK, we go to the next level and do something recursive 
     157  if (0 < level) 
     158  { 
     159    --level; 
     160    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level); 
     161  } 
     162  else // Now, we are in the last level where necessary mappings are. 
     163    indexToInfoMappingLevel_= (index2InfoMapping_); 
     164 
     165  typename Index2InfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap; 
     166  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0); 
     167  currentIndex = 0; 
     168  for (int idx = 0; idx < nbRecvClient; ++idx) 
     169  { 
     170    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i) 
     171    { 
     172      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i)); 
     173      if (iteIndexToInfoMap != itIndexToInfoMap) ++sendNbIndexOnReturn[idx]; 
     174    } 
     175    currentIndex += recvNbIndexClientCount[idx]; 
     176  } 
     177 
     178  std::vector<int> recvRankOnReturn(client2ClientIndex.size()); 
     179  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0); 
     180  int indexIndex = 0; 
     181  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex) 
     182  { 
     183    recvRankOnReturn[indexIndex] = itIndex->first; 
     184  } 
     185  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn, 
     186                   recvRankOnReturn, recvNbIndexOnReturn); 
     187 
     188  int recvNbIndexCountOnReturn = 0; 
     189  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx) 
     190    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx]; 
     191 
     192  unsigned long* recvIndexBuffOnReturn; 
     193  unsigned char* recvInfoBuffOnReturn; 
     194  if (0 != recvNbIndexCountOnReturn) 
     195  { 
     196    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn]; 
     197    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()]; 
     198  } 
     199 
     200  std::vector<MPI_Request> requestOnReturn; 
     201  currentIndex = 0; 
     202  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx) 
     203  { 
     204    if (0 != recvNbIndexOnReturn[idx]) 
     205    { 
     206      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn); 
     207      recvInfoFromClients(recvRankOnReturn[idx], 
     208                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 
     209                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), 
     210                          commLevel, requestOnReturn); 
     211    } 
     212    currentIndex += recvNbIndexOnReturn[idx]; 
     213  } 
     214 
     215  std::map<int,unsigned char*> client2ClientInfoOnReturn; 
     216  std::map<int,size_t*> client2ClientIndexOnReturn; 
     217  currentIndex = 0; 
     218  for (int idx = 0; idx < nbRecvClient; ++idx) 
     219  { 
     220    if (0 != sendNbIndexOnReturn[idx]) 
     221    { 
     222      int rank = recvRankClient[idx]; 
     223      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]]; 
     224      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()]; 
     225      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank]; 
     226      int infoIndex = 0; 
     227      int nb = 0; 
     228      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i) 
    170229      { 
    171         int flagIndexGlobal, count; 
    172         MPI_Status statusIndexGlobal; 
    173  
    174         MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal); 
    175         if (true == flagIndexGlobal) 
     230        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i)); 
     231        if (iteIndexToInfoMap != itIndexToInfoMap) 
    176232        { 
    177           MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); 
    178           int clientSourceRank = statusIndexGlobal.MPI_SOURCE; 
    179           unsigned long* beginBuff = indexBuffBegin[clientSourceRank]; 
    180           for (int i = 0; i < count; ++i) 
    181           { 
    182             src2Index[clientSourceRank].push_back(*(beginBuff+i)); 
    183             tmpGlobalIndexOnClient(k) = *(beginBuff+i); 
    184             ++k; 
    185           } 
    186           --nbDemandingClient; 
    187  
    188           demandAlreadyReceived.push_back(clientSourceRank); 
     233          client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first; 
     234          ProcessDHTElement<InfoType>::packElement(itIndexToInfoMap->second, tmpInfoPtr, infoIndex); 
     235          ++nb; 
    189236        } 
    190237      } 
    191       for (int i = 0; i< demandAlreadyReceived.size(); ++i) 
    192         requestRecvIndex.erase(demandAlreadyReceived[i]); 
    193     } 
    194  
    195     testSendRequest(sendIndexRequest); 
    196   } 
    197  
    198   if (0 < level) 
    199   { 
    200     --level; 
    201     computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->internalComm_, level); 
    202   } 
    203   else 
    204     indexToInfoMappingLevel_ = index2InfoMapping_; 
    205  
    206   std::map<int, std::vector<InfoType> > client2ClientInfo; 
    207   std::vector<unsigned char*> infoToSend(src2Index.size()); 
    208   std::list<MPI_Request> sendInfoRequest; 
    209   std::map<int, std::vector<size_t> >::iterator itSrc2Idx = src2Index.begin(), 
    210                                                 iteSrc2Idx = src2Index.end(); 
    211   for (int i=0; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx, ++i) 
    212   { 
    213     int clientSourceRank = itSrc2Idx->first; 
    214     std::vector<size_t>& srcIdx = itSrc2Idx->second; 
    215     infoToSend[i] = new unsigned char [srcIdx.size()*ProcessDHTElement<InfoType>::typeSize()]; 
    216     int infoIndex = 0; 
    217     for (int idx = 0; idx < srcIdx.size(); ++idx) 
    218     { 
    219       ProcessDHTElement<InfoType>::packElement(indexToInfoMappingLevel_[srcIdx[idx]], infoToSend[i], infoIndex); 
    220     } 
    221     sendInfoToClients(clientSourceRank, infoToSend[i], infoIndex, commLevel, sendInfoRequest); 
    222   } 
     238 
     239      sendIndexToClients(rank, client2ClientIndexOnReturn[rank], 
     240                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn); 
     241      sendInfoToClients(rank, client2ClientInfoOnReturn[rank], 
     242                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn); 
     243    } 
     244    currentIndex += recvNbIndexClientCount[idx]; 
     245  } 
     246 
     247  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size()); 
     248  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]); 
    223249 
    224250  boost::unordered_map<size_t,InfoType> indexToInfoMapping; 
    225   int countInfo = 0; // Counting of buffer for receiving server index 
    226   std::map<int, MPI_Request> requestRecvInfo; 
    227  
    228   // Mapping client rank and the begining position of receiving buffer for message of server index from this client 
    229   std::map<int, unsigned char*> infoBuffBegin; 
    230  
    231   while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers)) 
    232   { 
    233     testSendRequest(sendInfoRequest); 
    234  
    235     // In some cases, a client need to listen respond from other clients about server information 
    236     // Ok, with the information, a client can fill in its server-global index map. 
    237     probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers, 
    238                                 countInfo, infoBuffBegin, 
    239                                 requestRecvInfo, commLevel); 
    240     for (itRequest = requestRecvInfo.begin(); 
    241          itRequest != requestRecvInfo.end(); 
    242          ++itRequest) 
    243     { 
    244       int flagInfo, count; 
    245       MPI_Status statusInfo; 
    246  
    247       MPI_Test(&(itRequest->second), &flagInfo, &statusInfo); 
    248       if (true == flagInfo) 
    249       { 
    250         MPI_Get_count(&statusInfo, MPI_CHAR, &count); 
    251         int actualCountInfo = count/infoTypeSize; 
    252         int clientSourceRank = statusInfo.MPI_SOURCE; 
    253         unsigned char* beginBuff = infoBuffBegin[clientSourceRank]; 
    254         size_t* indexTmp = client2ClientIndex[clientSourceRank]; 
    255         int infoIndex = 0; 
    256         for (int i = 0; i < actualCountInfo; ++i) 
    257         { 
    258           ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[indexTmp[i]], beginBuff, infoIndex); 
    259         } 
    260         nbSendBuffInfoReceived += actualCountInfo; 
    261         repondAlreadyReceived.push_back(clientSourceRank); 
    262       } 
    263     } 
    264  
    265     for (int i = 0; i< repondAlreadyReceived.size(); ++i) 
    266       requestRecvInfo.erase(repondAlreadyReceived[i]); 
    267     repondAlreadyReceived.resize(0); 
    268   } 
    269  
    270   indexToInfoMappingLevel_.swap(indexToInfoMapping); 
    271   if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex; 
    272   if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo; 
    273   for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 
    274   for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx]; 
     251  int infoIndex = 0; 
     252  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx) 
     253  { 
     254    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex); 
     255  } 
     256 
     257  indexToInfoMappingLevel_ = (indexToInfoMapping); 
     258  if (0 != recvNbIndexCount) delete [] recvIndexBuff; 
     259  for (std::map<int,size_t*>::const_iterator it = client2ClientIndex.begin(); 
     260                                            it != client2ClientIndex.end(); ++it) 
     261      delete [] it->second; 
     262  delete tmpGlobalIndex; 
     263 
     264  if (0 != recvNbIndexCountOnReturn) 
     265  { 
     266    delete [] recvIndexBuffOnReturn; 
     267    delete [] recvInfoBuffOnReturn; 
     268  } 
     269 
     270  for (std::map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin(); 
     271                                                    it != client2ClientInfoOnReturn.end(); ++it) 
     272      delete [] it->second; 
     273 
     274  for (std::map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin(); 
     275                                            it != client2ClientIndexOnReturn.end(); ++it) 
     276      delete [] it->second; 
    275277} 
    276278 
     
    297299/*! 
    298300  Compute distribution of global index for servers 
    299   Each client already holds a piece of information and its attached index. 
    300 This information will be redistributed among processes by projecting indices into size_t space. 
     301  Each client already holds a piece of information and its associated index. 
     302This information will be redistributed among processes by projecting indices into size_t space, 
     303the corresponding information will be also distributed on size_t space. 
    301304After the redistribution, each client holds rearranged index and its corresponding information. 
    302305  \param [in] indexInfoMap index and its corresponding info (usually server index) 
     
    369372  // Calculate from how many clients each client receive message. 
    370373  // Calculate size of buffer for receiving message 
    371   int recvNbClient, recvNbIndexCount; 
     374  std::vector<int> recvRankClient, recvNbIndexClientCount; 
    372375  sendRecvRank(level, sendBuff, sendNbIndexBuff, 
    373                recvNbClient, recvNbIndexCount); 
     376               recvRankClient, recvNbIndexClientCount); 
     377 
     378  int recvNbIndexCount = 0; 
     379  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx) 
     380    recvNbIndexCount += recvNbIndexClientCount[idx]; 
     381 
     382  unsigned long* recvIndexBuff; 
     383  unsigned char* recvInfoBuff; 
     384  if (0 != recvNbIndexCount) 
     385  { 
     386    recvIndexBuff = new unsigned long[recvNbIndexCount]; 
     387    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 
     388  } 
    374389 
    375390  // If a client holds information about index and the corresponding which don't belong to it, 
    376391  // it will send a message to the correct clients. 
    377392  // Contents of the message are index and its corresponding informatioin 
    378   std::list<MPI_Request> sendRequest; 
     393  std::vector<MPI_Request> request; 
     394  int currentIndex = 0; 
     395  int nbRecvClient = recvRankClient.size(); 
     396  for (int idx = 0; idx < nbRecvClient; ++idx) 
     397  { 
     398    if (0 != recvNbIndexClientCount[idx]) 
     399    { 
     400      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request); 
     401      recvInfoFromClients(recvRankClient[idx], 
     402                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 
     403                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 
     404                          commLevel, request); 
     405    } 
     406    currentIndex += recvNbIndexClientCount[idx]; 
     407  } 
     408 
    379409  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex, 
    380410                                    iteIndex = client2ClientIndex.end(); 
    381411  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) 
    382     sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendRequest); 
     412    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request); 
    383413  std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo, 
    384414                                          iteInfo = client2ClientInfo.end(); 
    385415  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) 
    386     sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, sendRequest); 
    387  
    388  
    389   unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount]; 
    390   unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()]; 
    391  
    392   std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo; 
    393   std::map<int, int> countBuffInfo, countBuffIndex; 
    394   std::vector<int> processedList; 
    395  
    396   bool isFinished = (0 == recvNbClient) ? true : false; 
    397  
    398   // Counting of buffer for receiving global index 
    399   int countIndex = 0; 
    400  
    401   // Counting of buffer for receiving server index 
    402   int countInfo = 0; 
    403  
    404   // Request returned by MPI_IRecv function about global index 
    405   std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo; 
    406  
    407   // Mapping client rank and the beginning position of receiving buffer for message of global index from this client 
    408   std::map<int, unsigned long*> indexBuffBegin; 
    409  
    410   // Mapping client rank and the begining position of receiving buffer for message of server index from this client 
    411   std::map<int, unsigned char*> infoBuffBegin; 
     416    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request); 
     417 
     418  std::vector<MPI_Status> status(request.size()); 
     419  MPI_Waitall(request.size(), &request[0], &status[0]); 
    412420 
    413421  boost::unordered_map<size_t,InfoType> indexToInfoMapping; 
    414  
    415   // Now each client trys to listen to demand from others. 
    416   // If they have message, it processes: pushing global index and corresponding server to its map 
    417   while (!isFinished || (!sendRequest.empty())) 
    418   { 
    419     testSendRequest(sendRequest); 
    420     probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount, 
    421                                  countIndex, indexBuffBegin, 
    422                                  requestRecvIndex, commLevel); 
    423     // Processing complete request 
    424     for (itRequestIndex = requestRecvIndex.begin(); 
    425          itRequestIndex != requestRecvIndex.end(); 
    426          ++itRequestIndex) 
    427     { 
    428       int rank = itRequestIndex->first; 
    429       int count = computeBuffCountIndex(itRequestIndex->second); 
    430       if (0 != count) 
    431         countBuffIndex[rank] = count; 
    432     } 
    433  
    434     probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount, 
    435                                 countInfo, infoBuffBegin, 
    436                                 requestRecvInfo, commLevel); 
    437     for (itRequestInfo = requestRecvInfo.begin(); 
    438          itRequestInfo != requestRecvInfo.end(); 
    439          ++itRequestInfo) 
    440     { 
    441       int rank = itRequestInfo->first; 
    442       int count = computeBuffCountInfo(itRequestInfo->second); 
    443       if (0 != count) 
    444         countBuffInfo[rank] = count; 
    445     } 
    446  
    447     for (std::map<int, int>::iterator it = countBuffIndex.begin(); 
    448                                       it != countBuffIndex.end(); ++it) 
    449     { 
    450       int rank = it->first; 
    451       if ((countBuffInfo.end() != countBuffInfo.find(rank)) && 
    452           (countBuffIndex.end() != countBuffIndex.find(rank))) 
    453       { 
    454         int count = it->second; 
    455         InfoType infoValue; 
    456         int infoIndex = 0; 
    457         for (int i = 0; i < count; ++i) 
    458         { 
    459           ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuffBegin[rank], infoIndex); 
    460           indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),infoValue)); 
    461         } 
    462  
    463         processedList.push_back(rank); 
    464         --recvNbClient; 
    465       } 
    466     } 
    467  
    468     for (int i = 0; i < processedList.size(); ++i) 
    469     { 
    470       requestRecvInfo.erase(processedList[i]); 
    471       requestRecvIndex.erase(processedList[i]); 
    472       countBuffIndex.erase(processedList[i]); 
    473       countBuffInfo.erase(processedList[i]); 
    474     } 
    475  
    476     if (0 == recvNbClient) isFinished = true; 
    477   } 
    478  
    479   for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second; 
    480   for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) delete [] itInfo->second; 
    481   delete [] recvIndexBuff; 
    482   delete [] recvInfoBuff; 
     422  currentIndex = 0; 
     423  InfoType infoValue; 
     424  int infoIndex = 0; 
     425  unsigned char* infoBuff = recvInfoBuff; 
     426  for (int idx = 0; idx < nbRecvClient; ++idx) 
     427  { 
     428    int count = recvNbIndexClientCount[idx]; 
     429    for (int i = 0; i < count; ++i) 
     430    { 
     431      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex); 
     432      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue; 
     433    } 
     434    currentIndex += count; 
     435  } 
     436 
     437  if (0 != recvNbIndexCount) 
     438  { 
     439    delete [] recvIndexBuff; 
     440    delete [] recvInfoBuff; 
     441  } 
     442  for (std::map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin(); 
     443                                                    it != client2ClientInfo.end(); ++it) 
     444      delete [] it->second; 
     445 
     446  for (std::map<int,size_t*>::const_iterator it = client2ClientIndex.begin(); 
     447                                            it != client2ClientIndex.end(); ++it) 
     448      delete [] it->second; 
    483449 
    484450  // Ok, now do something recursive 
     
    489455  } 
    490456  else 
    491     index2InfoMapping_ = indexToInfoMapping; 
    492 } 
    493  
    494 /*! 
    495   Probe and receive message containg global index from other clients. 
    496   Each client can send a message of global index to other clients to fulfill their maps. 
    497 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 
    498   \param [in] recvIndexBuff buffer dedicated for receiving global index 
    499   \param [in] recvNbIndexCount size of the buffer 
    500   \param [in] countIndex number of received index 
    501   \param [in] indexBuffBegin beginning of index buffer for each source rank 
    502   \param [in] requestRecvIndex request of receving index 
    503   \param [in] intraComm communicator 
    504 */ 
    505 template<typename T, typename H> 
    506 void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff, 
    507                                                                  const int recvNbIndexCount, 
    508                                                                  int& countIndex, 
    509                                                                  std::map<int, unsigned long*>& indexBuffBegin, 
    510                                                                  std::map<int, MPI_Request>& requestRecvIndex, 
    511                                                                  const MPI_Comm& intraComm) 
    512 { 
    513   MPI_Status statusIndexGlobal; 
    514   int flagIndexGlobal, count; 
    515  
    516   // Probing for global index 
    517   MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal); 
    518   if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount)) 
    519   { 
    520     MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count); 
    521     indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex)); 
    522     MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG, 
    523               statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm, 
    524               &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]); 
    525     countIndex += count; 
    526   } 
    527 } 
    528  
    529 /*! 
    530   Probe and receive message containg server index from other clients. 
    531   Each client can send a message of server index to other clients to fulfill their maps. 
    532 Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer 
    533   \param [in] recvInfoBuff buffer dedicated for receiving server index 
    534   \param [in] recvNbIndexCount size of the buffer 
    535   \param [in] countInfo number of received info 
    536   \param [in] infoBuffBegin beginning of index buffer for each source rank 
    537   \param [in] requestRecvInfo request of receving index 
    538   \param [in] intraComm communicator 
    539 */ 
    540 template<typename T, typename H> 
    541 void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(unsigned char* recvInfoBuff, 
    542                                                                 const int recvNbIndexCount, 
    543                                                                 int& countInfo, 
    544                                                                 std::map<int, unsigned char*>& infoBuffBegin, 
    545                                                                 std::map<int, MPI_Request>& requestRecvInfo, 
    546                                                                 const MPI_Comm& intraComm) 
    547 { 
    548   MPI_Status statusInfo; 
    549   int flagInfo, count; 
    550  
    551   // Probing for server index 
    552   MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo); 
    553   if ((true == flagInfo) && (countInfo < recvNbIndexCount)) 
    554   { 
    555     MPI_Get_count(&statusInfo, MPI_CHAR, &count); 
    556     unsigned char* beginInfoBuff = recvInfoBuff+countInfo*infoTypeSize; 
    557     infoBuffBegin.insert(std::make_pair<int, unsigned char*>(statusInfo.MPI_SOURCE, beginInfoBuff)); 
    558     MPI_Irecv(beginInfoBuff, count, MPI_CHAR, 
    559               statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm, 
    560               &requestRecvInfo[statusInfo.MPI_SOURCE]); 
    561  
    562     countInfo += count/infoTypeSize; 
    563   } 
     457    index2InfoMapping_ = (indexToInfoMapping); 
    564458} 
    565459 
     
    568462  \param [in] clientDestRank rank of destination client 
    569463  \param [in] indices index to send 
     464  \param [in] indiceSize size of index array to send 
    570465  \param [in] clientIntraComm communication group of client 
    571466  \param [in] requestSendIndex list of sending request 
     
    574469void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize, 
    575470                                                       const MPI_Comm& clientIntraComm, 
    576                                                        std::list<MPI_Request>& requestSendIndex) 
     471                                                       std::vector<MPI_Request>& requestSendIndex) 
    577472{ 
    578473  MPI_Request request; 
     
    583478 
    584479/*! 
     480  Receive message containing index to clients 
     481  \param [in] clientDestRank rank of destination client 
     482  \param [in] indices index to send 
     483  \param [in] clientIntraComm communication group of client 
     484  \param [in] requestRecvIndex list of receiving request 
     485*/ 
     486template<typename T, typename H> 
     487void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize, 
     488                                                         const MPI_Comm& clientIntraComm, 
     489                                                         std::vector<MPI_Request>& requestRecvIndex) 
     490{ 
     491  MPI_Request request; 
     492  requestRecvIndex.push_back(request); 
     493  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG, 
     494            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back())); 
     495} 
     496 
     497/*! 
    585498  Send message containing information to clients 
    586499  \param [in] clientDestRank rank of destination client 
    587   \param [in] info server index to send 
     500  \param [in] info info array to send 
     501  \param [in] infoSize info array size to send 
    588502  \param [in] clientIntraComm communication group of client 
    589503  \param [in] requestSendInfo list of sending request 
     
    591505template<typename T, typename H> 
    592506void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize, 
    593                        const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendInfo) 
     507                                                      const MPI_Comm& clientIntraComm, 
     508                                                      std::vector<MPI_Request>& requestSendInfo) 
    594509{ 
    595510  MPI_Request request; 
     
    601516 
    602517/*! 
    603   Verify status of sending request 
    604   \param [in] sendRequest sending request to verify 
    605 */ 
    606 template<typename T, typename H> 
    607 void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest) 
    608 { 
    609   int flag = 0; 
    610   MPI_Status status; 
    611   std::list<MPI_Request>::iterator itRequest; 
    612   int sizeListRequest = sendRequest.size(); 
    613   int idx = 0; 
    614   while (idx < sizeListRequest) 
    615   { 
    616     bool isErased = false; 
    617     for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest) 
    618     { 
    619       MPI_Test(&(*itRequest), &flag, &status); 
    620       if (true == flag) 
    621       { 
    622         isErased = true; 
    623         break; 
    624       } 
    625     } 
    626     if (true == isErased) sendRequest.erase(itRequest); 
    627     ++idx; 
    628   } 
    629 } 
    630  
    631 /*! 
    632   Compute size of message containing global index 
    633   \param[in] requestRecv request of message 
    634 */ 
    635 template<typename T, typename H> 
    636 int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv) 
    637 { 
    638   int flag, count = 0; 
    639   MPI_Status status; 
    640  
    641   MPI_Test(&requestRecv, &flag, &status); 
    642   if (true == flag) 
    643   { 
    644     MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count); 
    645   } 
    646  
    647   return count; 
    648 } 
    649  
    650 /*! 
    651   Compute size of message containing server index 
    652   \param[in] requestRecv request of message 
    653 */ 
    654 template<typename T, typename H> 
    655 int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv) 
    656 { 
    657   int flag, count = 0; 
    658   MPI_Status status; 
    659  
    660   MPI_Test(&requestRecv, &flag, &status); 
    661   if (true == flag) 
    662   { 
    663     MPI_Get_count(&status, MPI_CHAR, &count); 
    664   } 
    665  
    666   return (count/infoTypeSize); 
    667 } 
    668  
    669 /*! 
    670   Compute how many processes one process needs to send to and from how many processes it will receive 
     518  Receive message containing information from other clients 
     519  \param [in] clientDestRank rank of destination client 
     520  \param [in] info info array to receive 
     521  \param [in] infoSize info array size to receive 
     522  \param [in] clientIntraComm communication group of client 
     523  \param [in] requestRecvInfo list of sending request 
     524*/ 
     525template<typename T, typename H> 
     526void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize, 
     527                                                        const MPI_Comm& clientIntraComm, 
     528                                                        std::vector<MPI_Request>& requestRecvInfo) 
     529{ 
     530  MPI_Request request; 
     531  requestRecvInfo.push_back(request); 
     532 
     533  MPI_Irecv(info, infoSize, MPI_CHAR, 
     534            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back())); 
     535} 
     536 
     537/*! 
     538  Compute how many processes one process needs to send to and from how many processes it will receive. 
     539  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes) 
    671540*/ 
    672541template<typename T, typename H> 
     
    721590 
    722591/*! 
     592  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result 
     593  \param [in] sendNbRank Rank of clients to send to 
     594  \param [in] sendNbElements Number of elements each client to send to 
     595  \param [in] receiveNbRank Rank of clients to receive from 
     596  \param [out] recvNbElements Number of elements each client to send to 
     597*/ 
     598template<typename T, typename H> 
     599void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements, 
     600                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements) 
     601{ 
     602  recvNbElements.resize(recvNbRank.size()); 
     603  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size()); 
     604  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size()); 
     605 
     606  int nRequest = 0; 
     607  for (int idx = 0; idx < recvNbRank.size(); ++idx) 
     608  { 
     609    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT, 
     610              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 
     611    ++nRequest; 
     612  } 
     613 
     614  for (int idx = 0; idx < sendNbRank.size(); ++idx) 
     615  { 
     616    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT, 
     617              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 
     618    ++nRequest; 
     619  } 
     620 
     621  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]); 
     622} 
     623 
     624/*! 
    723625  Send and receive number of process each process need to listen to as well as number 
    724   of index it will receive 
     626  of index it will receive during the initalization phase 
     627  \param [in] level current level 
     628  \param [in] sendNbRank Rank of clients to send to 
     629  \param [in] sendNbElements Number of elements each client to send to 
     630  \param [out] receiveNbRank Rank of clients to receive from 
     631  \param [out] recvNbElements Number of elements each client to send to 
    725632*/ 
    726633template<typename T, typename H> 
    727634void CClientClientDHTTemplate<T,H>::sendRecvRank(int level, 
    728635                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, 
    729                                                  int& recvNbRank, int& recvNbElements) 
     636                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements) 
    730637{ 
    731638  int groupBegin = this->getGroupBegin()[level]; 
     
    735642  std::vector<int>& recvRank = recvRank_[level]; 
    736643  int sendBuffSize = sendRank.size(); 
    737   int* sendBuff = new int [sendBuffSize*2]; 
    738   std::vector<MPI_Request> request(sendBuffSize); 
    739   std::vector<MPI_Status> requestStatus(sendBuffSize); 
     644  std::vector<int> sendBuff(sendBuffSize*2); 
    740645  int recvBuffSize = recvRank.size(); 
    741   int* recvBuff = new int [2]; 
     646  std::vector<int> recvBuff(recvBuffSize*2,0); 
     647 
     648  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize); 
     649  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize); 
     650 
     651  int nRequest = 0; 
     652  for (int idx = 0; idx < recvBuffSize; ++idx) 
     653  { 
     654    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT, 
     655              recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 
     656    ++nRequest; 
     657  } 
    742658 
    743659  for (int idx = 0; idx < sendBuffSize; ++idx) 
     
    751667  { 
    752668    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT, 
    753               sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[idx]); 
    754   } 
    755  
    756   MPI_Status status; 
     669              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]); 
     670    ++nRequest; 
     671  } 
     672 
     673  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]); 
    757674  int nbRecvRank = 0, nbRecvElements = 0; 
     675  recvNbRank.clear(); 
     676  recvNbElements.clear(); 
    758677  for (int idx = 0; idx < recvBuffSize; ++idx) 
    759678  { 
    760     MPI_Recv(recvBuff, 2, MPI_INT, 
    761              recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &status); 
    762     nbRecvRank += *(recvBuff); 
    763     nbRecvElements += *(recvBuff+1); 
    764   } 
    765  
    766   MPI_Waitall(sendBuffSize, &request[0], &requestStatus[0]); 
    767  
    768   recvNbRank = nbRecvRank; 
    769   recvNbElements = nbRecvElements; 
    770  
    771   delete [] sendBuff; 
    772   delete [] recvBuff; 
    773 } 
    774  
    775 } 
     679    if (0 != recvBuff[2*idx]) 
     680    { 
     681      recvNbRank.push_back(recvRank[idx]); 
     682      recvNbElements.push_back(recvBuff[2*idx+1]); 
     683    } 
     684  } 
     685} 
     686 
     687} 
Note: See TracChangeset for help on using the changeset viewer.