source: XIOS/dev/branch_yushan/src/client_client_dht_template_impl.hpp @ 1070

Last change on this file since 1070 was 1070, checked in by yushan, 7 years ago

Preperation for merge from trunk

File size: 29.5 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#endif
15
16
17namespace xios
18{
19template<typename T, typename H>
20CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
21  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
22{
23  MPI_Comm_size(clientIntraComm, &nbClient_);
24  this->computeMPICommLevel();
25  int nbLvl = this->getNbLevel();
26  sendRank_.resize(nbLvl);
27  recvRank_.resize(nbLvl);
28}
29
30/*!
31  Constructor with initial distribution information and the corresponding index
32  Each client (process) holds a piece of information as well as the attached index, the index
33will be redistributed (projected) into size_t space as long as the associated information.
34  \param [in] indexInfoMap initial index and information mapping
35  \param [in] clientIntraComm communicator of clients
36  \param [in] hierarLvl level of hierarchy
37*/
38template<typename T, typename H>
39CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
40                                                        const ep_lib::MPI_Comm& clientIntraComm)
41  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
42{
43 
44  MPI_Comm_size(clientIntraComm, &nbClient_);
45  this->computeMPICommLevel();
46  int nbLvl = this->getNbLevel(); 
47  sendRank_.resize(nbLvl);
48  recvRank_.resize(nbLvl);
49  Index2VectorInfoTypeMap indexToVecInfoMap;
50  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor())); 
51  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end(); 
52  for (; it != ite; ++it)
53  {
54    indexToVecInfoMap[it->first].push_back(it->second);
55  } 
56 
57  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1); 
58}
59
60/*!
61  Constructor with initial distribution information and the corresponding index
62  Each client (process) holds a piece of information as well as the attached index, the index
63will be redistributed (projected) into size_t space as long as the associated information.
64  \param [in] indexInfoMap initial index and information mapping
65  \param [in] clientIntraComm communicator of clients
66  \param [in] hierarLvl level of hierarchy
67*/
68template<typename T, typename H>
69CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
70                                                        const ep_lib::MPI_Comm& clientIntraComm)
71  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
72{
73  MPI_Comm_size(clientIntraComm, &nbClient_);
74  this->computeMPICommLevel();
75  int nbLvl = this->getNbLevel();
76  sendRank_.resize(nbLvl);
77  recvRank_.resize(nbLvl);
78  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
79}
80
81template<typename T, typename H>
82CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
83{
84}
85
86/*!
87  Compute mapping between indices and information corresponding to these indices
88  \param [in] indices indices a proc has
89*/
90template<typename T, typename H>
91void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
92{
93  int nbLvl = this->getNbLevel();
94  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
95}
96
97/*!
98    Compute mapping between indices and information corresponding to these indices
99for each level of hierarchical DHT. Recursive function
100   \param [in] indices indices a proc has
101   \param [in] commLevel communicator of current level
102   \param [in] level current level
103*/
104template<typename T, typename H>
105void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
106                                                                 const ep_lib::MPI_Comm& commLevel,
107                                                                 int level)
108{
109  int groupRankBegin = this->getGroupBegin()[level];
110  int nbClient = this->getNbInGroup()[level];
111  std::vector<size_t> hashedIndex;
112  computeHashIndex(hashedIndex, nbClient);
113
114  size_t ssize = indices.numElements(), hashedVal;
115
116  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
117                                      iteClientHash = hashedIndex.end();
118  std::vector<int> sendBuff(nbClient,0);
119  std::vector<int> sendNbIndexBuff(nbClient,0);
120
121  // Number of global index whose mapping server are on other clients
122  int nbIndexToSend = 0;
123  size_t index;
124  HashXIOS<size_t> hashGlobalIndex;
125  boost::unordered_map<size_t,int> nbIndices;
126  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
127  for (int i = 0; i < ssize; ++i)
128  {
129    index = indices(i);
130    if (0 == nbIndices.count(index))
131    {
132      hashedVal  = hashGlobalIndex(index);
133      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
134      int indexClient = std::distance(itbClientHash, itClientHash)-1;
135      ++sendNbIndexBuff[indexClient];
136      nbIndices[index] = 1;
137    }
138  }
139
140  boost::unordered_map<int, size_t* > client2ClientIndex;
141  for (int idx = 0; idx < nbClient; ++idx)
142  {
143    if (0 != sendNbIndexBuff[idx])
144    {
145      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
146      nbIndexToSend += sendNbIndexBuff[idx];
147      sendBuff[idx] = 1;
148      sendNbIndexBuff[idx] = 0;
149    }
150  }
151
152  for (int i = 0; i < ssize; ++i)
153  {
154    index = indices(i);
155    if (1 == nbIndices[index])
156    {
157      hashedVal  = hashGlobalIndex(index);
158      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
159      int indexClient = std::distance(itbClientHash, itClientHash)-1;
160      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
161      ++sendNbIndexBuff[indexClient];
162      ++nbIndices[index];
163    }
164  }
165
166  std::vector<int> recvRankClient, recvNbIndexClientCount;
167  sendRecvRank(level, sendBuff, sendNbIndexBuff,
168               recvRankClient, recvNbIndexClientCount);
169
170  int recvNbIndexCount = 0;
171  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
172    recvNbIndexCount += recvNbIndexClientCount[idx];
173
174  unsigned long* recvIndexBuff;
175  if (0 != recvNbIndexCount)
176    recvIndexBuff = new unsigned long[recvNbIndexCount];
177
178  std::vector<ep_lib::MPI_Request> request;
179  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
180                             iteRecvIndex = recvRankClient.end(),
181                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
182                           itRecvNbIndex;
183  int currentIndex = 0;
184  int nbRecvClient = recvRankClient.size();
185  for (int idx = 0; idx < nbRecvClient; ++idx)
186  {
187    if (0 != recvNbIndexClientCount[idx])
188    {
189      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
190    }
191    currentIndex += recvNbIndexClientCount[idx];
192  }
193
194  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
195                                                iteIndex = client2ClientIndex.end();
196  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
197    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
198
199  std::vector<ep_lib::MPI_Status> status(request.size());
200  MPI_Waitall(request.size(), &request[0], &status[0]);
201
202
203  CArray<size_t,1>* tmpGlobalIndex;
204  if (0 != recvNbIndexCount)
205    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
206  else
207    tmpGlobalIndex = new CArray<size_t,1>();
208
209  // OK, we go to the next level and do something recursive
210  if (0 < level)
211  {
212    --level;
213    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
214  }
215  else // Now, we are in the last level where necessary mappings are.
216    indexToInfoMappingLevel_= (index2InfoMapping_);
217
218  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
219  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
220  currentIndex = 0;
221  for (int idx = 0; idx < nbRecvClient; ++idx)
222  {
223    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
224    {
225      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
226      if (iteIndexToInfoMap != itIndexToInfoMap)
227        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
228    }
229    currentIndex += recvNbIndexClientCount[idx];
230  }
231
232  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
233  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
234  int indexIndex = 0;
235  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
236  {
237    recvRankOnReturn[indexIndex] = itIndex->first;
238  }
239  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
240                   recvRankOnReturn, recvNbIndexOnReturn);
241
242  int recvNbIndexCountOnReturn = 0;
243  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
244    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
245
246  unsigned long* recvIndexBuffOnReturn;
247  unsigned char* recvInfoBuffOnReturn;
248  if (0 != recvNbIndexCountOnReturn)
249  {
250    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
251    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
252  }
253
254  std::vector<ep_lib::MPI_Request> requestOnReturn;
255  currentIndex = 0;
256  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
257  {
258    if (0 != recvNbIndexOnReturn[idx])
259    {
260      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
261      recvInfoFromClients(recvRankOnReturn[idx],
262                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
263                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
264                          commLevel, requestOnReturn);
265    }
266    currentIndex += recvNbIndexOnReturn[idx];
267  }
268
269  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
270  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
271  currentIndex = 0;
272  for (int idx = 0; idx < nbRecvClient; ++idx)
273  {
274    if (0 != sendNbIndexOnReturn[idx])
275    {
276      int rank = recvRankClient[idx];
277      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
278      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
279      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
280      int infoIndex = 0;
281      int nb = 0;
282      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
283      {
284        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
285        if (iteIndexToInfoMap != itIndexToInfoMap)
286        {
287          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
288          for (int k = 0; k < infoTmp.size(); ++k)
289          {
290            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
291            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
292            ++nb;
293          }
294        }
295      }
296
297      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
298                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
299      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
300                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
301    }
302    currentIndex += recvNbIndexClientCount[idx];
303  }
304
305  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
306  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
307
308
309  Index2VectorInfoTypeMap indexToInfoMapping;
310  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
311  int infoIndex = 0;
312  InfoType unpackedInfo;
313  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
314  {
315    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
316    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
317  }
318
319  indexToInfoMappingLevel_.swap(indexToInfoMapping);
320  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
321  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
322                                                        it != client2ClientIndex.end(); ++it)
323      delete [] it->second;
324  delete tmpGlobalIndex;
325
326  if (0 != recvNbIndexCountOnReturn)
327  {
328    delete [] recvIndexBuffOnReturn;
329    delete [] recvInfoBuffOnReturn;
330  }
331
332  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
333                                                               it != client2ClientInfoOnReturn.end(); ++it)
334      delete [] it->second;
335
336  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
337                                            it != client2ClientIndexOnReturn.end(); ++it)
338      delete [] it->second;
339}
340
341/*!
342  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
343*/
344template<typename T, typename H>
345void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
346{
347  // Compute range of hash index for each client
348  hashedIndex.resize(nbClient+1);
349  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
350  size_t nbHashIndex;
351  hashedIndex[0] = 0;
352  for (int i = 1; i < nbClient; ++i)
353  {
354    nbHashIndex = nbHashIndexMax / nbClient;
355    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
356    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
357  }
358  hashedIndex[nbClient] = nbHashIndexMax;
359}
360
361/*!
362  Compute distribution of global index for servers
363  Each client already holds a piece of information and its associated index.
364This information will be redistributed among processes by projecting indices into size_t space,
365the corresponding information will be also distributed on size_t space.
366After the redistribution, each client holds rearranged index and its corresponding information.
367  \param [in] indexInfoMap index and its corresponding info (usually server index)
368  \param [in] commLevel communicator of current level
369  \param [in] level current level
370*/
371template<typename T, typename H>
372void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
373                                                            const ep_lib::MPI_Comm& commLevel,
374                                                            int level)
375{
376  int clientRank;
377  MPI_Comm_rank(commLevel,&clientRank);
378  computeSendRecvRank(level, clientRank);
379
380  int groupRankBegin = this->getGroupBegin()[level];
381  int nbClient = this->getNbInGroup()[level];
382  std::vector<size_t> hashedIndex;
383  computeHashIndex(hashedIndex, nbClient);
384
385  std::vector<int> sendBuff(nbClient,0);
386  std::vector<int> sendNbIndexBuff(nbClient,0);
387  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
388                                      iteClientHash = hashedIndex.end();
389  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
390                                                   ite = indexInfoMap.end();
391  HashXIOS<size_t> hashGlobalIndex;
392
393  // Compute size of sending and receving buffer
394  for (it = itb; it != ite; ++it)
395  {
396    size_t hashIndex = hashGlobalIndex(it->first);
397    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
398    int indexClient = std::distance(itbClientHash, itClientHash)-1;
399    sendNbIndexBuff[indexClient] += it->second.size();
400  }
401
402  boost::unordered_map<int, size_t*> client2ClientIndex;
403  boost::unordered_map<int, unsigned char*> client2ClientInfo;
404  for (int idx = 0; idx < nbClient; ++idx)
405  {
406    if (0 != sendNbIndexBuff[idx])
407    {
408      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
409      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
410      sendNbIndexBuff[idx] = 0;
411      sendBuff[idx] = 1;
412    }
413  }
414
415  std::vector<int> sendNbInfo(nbClient,0);
416  for (it = itb; it != ite; ++it)
417  {
418    const std::vector<InfoType>& infoTmp = it->second;
419    size_t hashIndex = hashGlobalIndex(it->first);
420    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
421    int indexClient = std::distance(itbClientHash, itClientHash)-1;
422    for (int idx = 0; idx < infoTmp.size(); ++idx)
423    {
424      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
425      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
426      ++sendNbIndexBuff[indexClient];
427    }
428  }
429
430  // Calculate from how many clients each client receive message.
431  // Calculate size of buffer for receiving message
432  std::vector<int> recvRankClient, recvNbIndexClientCount;
433  sendRecvRank(level, sendBuff, sendNbIndexBuff,
434               recvRankClient, recvNbIndexClientCount);
435
436  int recvNbIndexCount = 0;
437  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
438    recvNbIndexCount += recvNbIndexClientCount[idx];
439
440  unsigned long* recvIndexBuff;
441  unsigned char* recvInfoBuff;
442  if (0 != recvNbIndexCount)
443  {
444    recvIndexBuff = new unsigned long[recvNbIndexCount];
445    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
446  }
447
448  // If a client holds information about index and the corresponding which don't belong to it,
449  // it will send a message to the correct clients.
450  // Contents of the message are index and its corresponding informatioin
451  std::vector<ep_lib::MPI_Request> request;
452  int currentIndex = 0;
453  int nbRecvClient = recvRankClient.size();
454  for (int idx = 0; idx < nbRecvClient; ++idx)
455  {
456    if (0 != recvNbIndexClientCount[idx])
457    {
458      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
459
460      recvInfoFromClients(recvRankClient[idx],
461                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
462                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
463                          commLevel, request);
464
465    }
466    currentIndex += recvNbIndexClientCount[idx];
467  }
468
469  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
470                                                iteIndex = client2ClientIndex.end();
471  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
472  {
473    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
474
475  }
476
477  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
478                                                      iteInfo = client2ClientInfo.end();
479  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
480  {
481    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
482
483  }
484
485  std::vector<ep_lib::MPI_Status> status(request.size());
486
487  MPI_Waitall(request.size(), &request[0], &status[0]);
488
489  Index2VectorInfoTypeMap indexToInfoMapping;
490  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
491  currentIndex = 0;
492  InfoType infoValue;
493  int infoIndex = 0;
494  unsigned char* infoBuff = recvInfoBuff;
495  for (int idx = 0; idx < nbRecvClient; ++idx)
496  {
497    size_t index;
498    int count = recvNbIndexClientCount[idx];
499    for (int i = 0; i < count; ++i)
500    {
501      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
502      unsigned long pp = *(recvIndexBuff+currentIndex+i);
503     
504      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
505    }
506    currentIndex += count;
507  }
508
509  if (0 != recvNbIndexCount)
510  {
511    delete [] recvIndexBuff;
512    delete [] recvInfoBuff;
513  }
514  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
515                                                               it != client2ClientInfo.end(); ++it)
516      delete [] it->second;
517
518  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
519                                                        it != client2ClientIndex.end(); ++it)
520      delete [] it->second;
521
522  // Ok, now do something recursive
523  if (0 < level)
524  {
525    --level;
526    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
527  }
528  else
529    index2InfoMapping_.swap(indexToInfoMapping);
530}
531
532/*!
533  Send message containing index to clients
534  \param [in] clientDestRank rank of destination client
535  \param [in] indices index to send
536  \param [in] indiceSize size of index array to send
537  \param [in] clientIntraComm communication group of client
538  \param [in] requestSendIndex list of sending request
539*/
540template<typename T, typename H>
541void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
542                                                       const ep_lib::MPI_Comm& clientIntraComm,
543                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
544{
545  ep_lib::MPI_Request request;
546  requestSendIndex.push_back(request);
547
548  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
549            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
550}
551
552/*!
553  Receive message containing index to clients
554  \param [in] clientDestRank rank of destination client
555  \param [in] indices index to send
556  \param [in] clientIntraComm communication group of client
557  \param [in] requestRecvIndex list of receiving request
558*/
559template<typename T, typename H>
560void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
561                                                         const ep_lib::MPI_Comm& clientIntraComm,
562                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
563{
564  ep_lib::MPI_Request request;
565  requestRecvIndex.push_back(request);
566
567  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
568            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
569}
570
571/*!
572  Send message containing information to clients
573  \param [in] clientDestRank rank of destination client
574  \param [in] info info array to send
575  \param [in] infoSize info array size to send
576  \param [in] clientIntraComm communication group of client
577  \param [in] requestSendInfo list of sending request
578*/
579template<typename T, typename H>
580void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
581                                                      const ep_lib::MPI_Comm& clientIntraComm,
582                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
583{
584  ep_lib::MPI_Request request;
585  requestSendInfo.push_back(request);
586  MPI_Isend(info, infoSize, MPI_CHAR,
587            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
588}
589
590/*!
591  Receive message containing information from other clients
592  \param [in] clientDestRank rank of destination client
593  \param [in] info info array to receive
594  \param [in] infoSize info array size to receive
595  \param [in] clientIntraComm communication group of client
596  \param [in] requestRecvInfo list of sending request
597*/
598template<typename T, typename H>
599void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
600                                                        const ep_lib::MPI_Comm& clientIntraComm,
601                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
602{
603  ep_lib::MPI_Request request;
604  requestRecvInfo.push_back(request);
605
606  MPI_Irecv(info, infoSize, MPI_CHAR,
607            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
608}
609
610/*!
611  Compute how many processes one process needs to send to and from how many processes it will receive.
612  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
613*/
614template<typename T, typename H>
615void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
616{
617  int groupBegin = this->getGroupBegin()[level];
618  int nbInGroup  = this->getNbInGroup()[level];
619  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
620  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
621
622  std::vector<size_t> hashedIndexGroup;
623  computeHashIndex(hashedIndexGroup, nbInGroup);
624  size_t a = hashedIndexGroup[rank-groupBegin];
625  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
626
627  int currentGroup, offset;
628  size_t e,f;
629
630  // Do a simple math [a,b) intersect [c,d)
631  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
632  {
633    std::vector<size_t> hashedIndexGroupParent;
634    int nbInGroupParent = nbInGroupParents[idx];
635    if (0 != nbInGroupParent)
636      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
637    for (int i = 0; i < nbInGroupParent; ++i)
638    {
639      size_t c = hashedIndexGroupParent[i];
640      size_t d = hashedIndexGroupParent[i+1]-1;
641
642    if (!((d < a) || (b <c)))
643        recvRank_[level].push_back(groupParentBegin[idx]+i);
644    }
645
646    offset = rank - groupParentBegin[idx];
647    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
648    {
649      e = hashedIndexGroupParent[offset];
650      f = hashedIndexGroupParent[offset+1]-1;
651    }
652  }
653
654  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
655                                      iteHashGroup = hashedIndexGroup.end();
656  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
657  int begin = std::distance(itbHashGroup, itHashGroup)-1;
658  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
659  int end = std::distance(itbHashGroup, itHashGroup) -1;
660  sendRank_[level].resize(end-begin+1);
661  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
662}
663
664/*!
665  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
666  \param [in] sendNbRank Rank of clients to send to
667  \param [in] sendNbElements Number of elements each client to send to
668  \param [in] receiveNbRank Rank of clients to receive from
669  \param [out] recvNbElements Number of elements each client to send to
670*/
671template<typename T, typename H>
672void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
673                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
674{
675  recvNbElements.resize(recvNbRank.size());
676  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
677  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
678
679  int nRequest = 0;
680  for (int idx = 0; idx < recvNbRank.size(); ++idx)
681  {
682    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
683              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
684    ++nRequest;
685  }
686
687  for (int idx = 0; idx < sendNbRank.size(); ++idx)
688  {
689    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
690              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
691    ++nRequest;
692  }
693 
694 
695  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
696}
697
698/*!
699  Send and receive number of process each process need to listen to as well as number
700  of index it will receive during the initalization phase
701  \param [in] level current level
702  \param [in] sendNbRank Rank of clients to send to
703  \param [in] sendNbElements Number of elements each client to send to
704  \param [out] receiveNbRank Rank of clients to receive from
705  \param [out] recvNbElements Number of elements each client to send to
706*/
707template<typename T, typename H>
708void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
709                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
710                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
711{
712  int groupBegin = this->getGroupBegin()[level];
713
714  int offSet = 0;
715  std::vector<int>& sendRank = sendRank_[level];
716  std::vector<int>& recvRank = recvRank_[level];
717  int sendBuffSize = sendRank.size();
718  std::vector<int> sendBuff(sendBuffSize*2);
719  int recvBuffSize = recvRank.size();
720  std::vector<int> recvBuff(recvBuffSize*2,0);
721
722  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
723  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
724
725  int nRequest = 0;
726  for (int idx = 0; idx < recvBuffSize; ++idx)
727  {
728    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
729              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
730    ++nRequest;
731  }
732
733
734  for (int idx = 0; idx < sendBuffSize; ++idx)
735  {
736    offSet = sendRank[idx]-groupBegin;
737    sendBuff[idx*2] = sendNbRank[offSet];
738    sendBuff[idx*2+1] = sendNbElements[offSet];
739  }
740
741  for (int idx = 0; idx < sendBuffSize; ++idx)
742  {
743    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
744              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
745    ++nRequest;
746  }
747
748  MPI_Barrier(this->internalComm_);
749
750
751  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
752
753  int nbRecvRank = 0, nbRecvElements = 0;
754  recvNbRank.clear();
755  recvNbElements.clear();
756  for (int idx = 0; idx < recvBuffSize; ++idx)
757  {
758    if (0 != recvBuff[2*idx])
759    {
760      recvNbRank.push_back(recvRank[idx]);
761      recvNbElements.push_back(recvBuff[2*idx+1]);
762    }
763  }
764}
765
766}
Note: See TracBrowser for help on using the repository browser.