Ignore:
Timestamp:
10/06/15 17:17:10 (9 years ago)
Author:
mhnguyen
Message:

First implementation of hierarchical distributed hashed table

+) Implement dht for int with index of type size_t

Test
+) Local
+) Work correctly

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/client_server_mapping_distributed.cpp

    r630 r720  
    1212#include <boost/functional/hash.hpp> 
    1313#include "utils.hpp" 
     14#include "client_client_dht.hpp" 
     15#include "mpi_tag.hpp" 
    1416 
    1517namespace xios 
     
    1921                                                                 const MPI_Comm& clientIntraComm, bool isDataDistributed) 
    2022  : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0), 
    21     indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed) 
     23    indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed), 
     24    ccDHT_(0) 
    2225{ 
    2326  clientIntraComm_ = clientIntraComm; 
     
    2528  MPI_Comm_rank(clientIntraComm,&clientRank_); 
    2629  computeHashIndex(); 
    27   computeDistributedServerIndex(globalIndexOfServer, clientIntraComm); 
     30 
     31  ccDHT_ = new CClientClientDHT(globalIndexOfServer, 
     32                                clientIntraComm, 
     33                                isDataDistributed); 
     34//  const boost::unordered_map<size_t,int>& globalIndexToServerMappingTmp = clientDht.getGlobalIndexServerMapping(); 
     35//  globalIndexToServerMapping_ = clientDht.getGlobalIndexServerMapping(); 
     36 
     37 
     38 
     39//  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm); 
    2840} 
    2941 
    3042CClientServerMappingDistributed::~CClientServerMappingDistributed() 
    3143{ 
     44  if (0 != ccDHT_) delete ccDHT_; 
    3245} 
    3346 
     
    3851void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient) 
    3952{ 
     53  ccDHT_->computeServerIndexMapping(globalIndexOnClient); 
     54  indexGlobalOnServer_ = ccDHT_->getGlobalIndexOnServer(); 
     55 
     56/* 
    4057  size_t ssize = globalIndexOnClient.numElements(), hashedIndex; 
    4158 
     
    177194  delete [] sendBuff; 
    178195  delete [] recvBuff; 
     196*/ 
    179197} 
    180198 
     
    348366 
    349367  // Probing for global index 
    350   MPI_Iprobe(MPI_ANY_SOURCE, 15, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal); 
     368  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal); 
    351369  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount)) 
    352370  { 
     
    354372    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_)); 
    355373    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG, 
    356               statusIndexGlobal.MPI_SOURCE, 15, clientIntraComm_, 
     374              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, 
    357375              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]); 
    358376    countIndexGlobal_ += count; 
     
    373391 
    374392  // Probing for server index 
    375   MPI_Iprobe(MPI_ANY_SOURCE, 12, clientIntraComm_, &flagIndexServer, &statusIndexServer); 
     393  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO_0, clientIntraComm_, &flagIndexServer, &statusIndexServer); 
    376394  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount)) 
    377395  { 
     
    379397    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_)); 
    380398    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT, 
    381               statusIndexServer.MPI_SOURCE, 12, clientIntraComm_, 
     399              statusIndexServer.MPI_SOURCE, MPI_DHT_INFO_0, clientIntraComm_, 
    382400              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]); 
    383401 
     
    400418  requestSendIndexGlobal.push_back(request); 
    401419  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG, 
    402             clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back())); 
     420            clientDestRank, MPI_DHT_INDEX_0, clientIntraComm, &(requestSendIndexGlobal.back())); 
    403421} 
    404422 
     
    417435  requestSendIndexServer.push_back(request); 
    418436  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT, 
    419             clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back())); 
     437            clientDestRank, MPI_DHT_INFO_0, clientIntraComm, &(requestSendIndexServer.back())); 
    420438} 
    421439 
Note: See TracChangeset for help on using the changeset viewer.