source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 842

Last change on this file since 842 was 839, checked in by mhnguyen, 8 years ago

Correcting a bug in dht

+) The exchange message (MPI_Isend,MPI_Irecv) must be finished in each level
+) If there are no corresponding information found, dht will return a empty array.
+) Remove some redundant codes

Test
+) On Curie
+) Up to 40 cores (3 levels)
+) All tests pass

File size: 26.5 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm)
26  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_()
27{
28  this->computeMPICommLevel();
29  int nbLvl = this->getNbLevel();
30  sendRank_.resize(nbLvl);
31  recvRank_.resize(nbLvl);
32  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
33}
34
35template<typename T, typename H>
36CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
37{
38}
39
40/*!
41  Compute mapping between indices and information corresponding to these indices
42  \param [in] indices indices a proc has
43*/
44template<typename T, typename H>
45void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
46{
47  int nbLvl = this->getNbLevel();
48  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
49}
50
51/*!
52    Compute mapping between indices and information corresponding to these indices
53for each level of hierarchical DHT. Recursive function
54   \param [in] indices indices a proc has
55   \param [in] commLevel communicator of current level
56   \param [in] level current level
57*/
58template<typename T, typename H>
59void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
60                                                                 const MPI_Comm& commLevel,
61                                                                 int level)
62{
63  int clientRank;
64  MPI_Comm_rank(commLevel,&clientRank);
65  int groupRankBegin = this->getGroupBegin()[level];
66  int nbClient = this->getNbInGroup()[level];
67  std::vector<size_t> hashedIndex;
68  computeHashIndex(hashedIndex, nbClient);
69
70  size_t ssize = indices.numElements(), hashedVal;
71
72  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
73                                      iteClientHash = hashedIndex.end();
74  std::vector<int> sendBuff(nbClient,0);
75  std::vector<int> sendNbIndexBuff(nbClient,0);
76
77  // Number of global index whose mapping server are on other clients
78  int nbIndexToSend = 0;
79  size_t index;
80  HashXIOS<size_t> hashGlobalIndex;
81  for (int i = 0; i < ssize; ++i)
82  {
83    index = indices(i);
84    hashedVal  = hashGlobalIndex(index);
85    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
86    int indexClient = std::distance(itbClientHash, itClientHash)-1;
87    ++sendNbIndexBuff[indexClient];
88  }
89
90  std::map<int, size_t* > client2ClientIndex;
91  for (int idx = 0; idx < nbClient; ++idx)
92  {
93    if (0 != sendNbIndexBuff[idx])
94    {
95      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
96      nbIndexToSend += sendNbIndexBuff[idx];
97      sendBuff[idx] = 1;
98      sendNbIndexBuff[idx] = 0;
99    }
100  }
101
102  for (int i = 0; i < ssize; ++i)
103  {
104    index = indices(i);
105    hashedVal  = hashGlobalIndex(index);
106    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
107    {
108      int indexClient = std::distance(itbClientHash, itClientHash)-1;
109      {
110        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
111        ++sendNbIndexBuff[indexClient];
112      }
113    }
114  }
115
116  std::vector<int> recvRankClient, recvNbIndexClientCount;
117  sendRecvRank(level, sendBuff, sendNbIndexBuff,
118               recvRankClient, recvNbIndexClientCount);
119
120  int recvNbIndexCount = 0;
121  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
122    recvNbIndexCount += recvNbIndexClientCount[idx];
123
124  unsigned long* recvIndexBuff;
125  if (0 != recvNbIndexCount)
126    recvIndexBuff = new unsigned long[recvNbIndexCount];
127
128  std::vector<MPI_Request> request;
129  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
130                             iteRecvIndex = recvRankClient.end(),
131                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
132                           itRecvNbIndex;
133  int currentIndex = 0;
134  int nbRecvClient = recvRankClient.size();
135  for (int idx = 0; idx < nbRecvClient; ++idx)
136  {
137    if (0 != recvNbIndexClientCount[idx])
138      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
139    currentIndex += recvNbIndexClientCount[idx];
140  }
141
142  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
143                                    iteIndex = client2ClientIndex.end();
144  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
145    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
146
147  std::vector<MPI_Status> status(request.size());
148  MPI_Waitall(request.size(), &request[0], &status[0]);
149
150  CArray<size_t,1>* tmpGlobalIndex;
151  if (0 != recvNbIndexCount)
152    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
153  else
154    tmpGlobalIndex = new CArray<size_t,1>();
155
156  // OK, we go to the next level and do something recursive
157  if (0 < level)
158  {
159    --level;
160    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
161  }
162  else // Now, we are in the last level where necessary mappings are.
163    indexToInfoMappingLevel_= (index2InfoMapping_);
164
165  typename Index2InfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
166  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
167  currentIndex = 0;
168  for (int idx = 0; idx < nbRecvClient; ++idx)
169  {
170    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
171    {
172      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
173      if (iteIndexToInfoMap != itIndexToInfoMap) ++sendNbIndexOnReturn[idx];
174    }
175    currentIndex += recvNbIndexClientCount[idx];
176  }
177
178  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
179  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
180  int indexIndex = 0;
181  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
182  {
183    recvRankOnReturn[indexIndex] = itIndex->first;
184  }
185  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
186                   recvRankOnReturn, recvNbIndexOnReturn);
187
188  int recvNbIndexCountOnReturn = 0;
189  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
190    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
191
192  unsigned long* recvIndexBuffOnReturn;
193  unsigned char* recvInfoBuffOnReturn;
194  if (0 != recvNbIndexCountOnReturn)
195  {
196    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
197    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
198  }
199
200  std::vector<MPI_Request> requestOnReturn;
201  currentIndex = 0;
202  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
203  {
204    if (0 != recvNbIndexOnReturn[idx])
205    {
206      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
207      recvInfoFromClients(recvRankOnReturn[idx],
208                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
209                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
210                          commLevel, requestOnReturn);
211    }
212    currentIndex += recvNbIndexOnReturn[idx];
213  }
214
215  std::map<int,unsigned char*> client2ClientInfoOnReturn;
216  std::map<int,size_t*> client2ClientIndexOnReturn;
217  currentIndex = 0;
218  for (int idx = 0; idx < nbRecvClient; ++idx)
219  {
220    if (0 != sendNbIndexOnReturn[idx])
221    {
222      int rank = recvRankClient[idx];
223      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
224      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
225      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
226      int infoIndex = 0;
227      int nb = 0;
228      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
229      {
230        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
231        if (iteIndexToInfoMap != itIndexToInfoMap)
232        {
233          client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
234          ProcessDHTElement<InfoType>::packElement(itIndexToInfoMap->second, tmpInfoPtr, infoIndex);
235          ++nb;
236        }
237      }
238
239      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
240                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
241      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
242                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
243    }
244    currentIndex += recvNbIndexClientCount[idx];
245  }
246
247  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
248  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
249
250  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
251  int infoIndex = 0;
252  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
253  {
254    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex);
255  }
256
257  indexToInfoMappingLevel_ = (indexToInfoMapping);
258  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
259  for (std::map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
260                                            it != client2ClientIndex.end(); ++it)
261      delete [] it->second;
262  delete tmpGlobalIndex;
263
264  if (0 != recvNbIndexCountOnReturn)
265  {
266    delete [] recvIndexBuffOnReturn;
267    delete [] recvInfoBuffOnReturn;
268  }
269
270  for (std::map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
271                                                    it != client2ClientInfoOnReturn.end(); ++it)
272      delete [] it->second;
273
274  for (std::map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
275                                            it != client2ClientIndexOnReturn.end(); ++it)
276      delete [] it->second;
277}
278
279/*!
280  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
281*/
282template<typename T, typename H>
283void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
284{
285  // Compute range of hash index for each client
286  hashedIndex.resize(nbClient+1);
287  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
288  size_t nbHashIndex;
289  hashedIndex[0] = 0;
290  for (int i = 1; i < nbClient; ++i)
291  {
292    nbHashIndex = nbHashIndexMax / nbClient;
293    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
294    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
295  }
296  hashedIndex[nbClient] = nbHashIndexMax;
297}
298
299/*!
300  Compute distribution of global index for servers
301  Each client already holds a piece of information and its associated index.
302This information will be redistributed among processes by projecting indices into size_t space,
303the corresponding information will be also distributed on size_t space.
304After the redistribution, each client holds rearranged index and its corresponding information.
305  \param [in] indexInfoMap index and its corresponding info (usually server index)
306  \param [in] commLevel communicator of current level
307  \param [in] level current level
308*/
309template<typename T, typename H>
310void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
311                                                            const MPI_Comm& commLevel,
312                                                            int level)
313{
314  int clientRank;
315  MPI_Comm_rank(commLevel,&clientRank);
316  computeSendRecvRank(level, clientRank);
317
318  int groupRankBegin = this->getGroupBegin()[level];
319  int nbClient = this->getNbInGroup()[level];
320  std::vector<size_t> hashedIndex;
321  computeHashIndex(hashedIndex, nbClient);
322
323  std::vector<int> sendBuff(nbClient,0);
324  std::vector<int> sendNbIndexBuff(nbClient,0);
325  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
326                                      iteClientHash = hashedIndex.end();
327  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it,
328                                                                 ite = indexInfoMap.end();
329  HashXIOS<size_t> hashGlobalIndex;
330
331  // Compute size of sending and receving buffer
332  for (it = itb; it != ite; ++it)
333  {
334    size_t hashIndex = hashGlobalIndex(it->first);
335    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
336    {
337      int indexClient = std::distance(itbClientHash, itClientHash)-1;
338      {
339        ++sendNbIndexBuff[indexClient];
340      }
341    }
342  }
343
344  std::map<int, size_t*> client2ClientIndex;
345  std::map<int, unsigned char*> client2ClientInfo;
346  for (int idx = 0; idx < nbClient; ++idx)
347  {
348    if (0 != sendNbIndexBuff[idx])
349    {
350      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
351      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
352      sendNbIndexBuff[idx] = 0;
353      sendBuff[idx] = 1;
354    }
355  }
356
357  std::vector<int> sendNbInfo(nbClient,0);
358  for (it = itb; it != ite; ++it)
359  {
360    size_t hashIndex = hashGlobalIndex(it->first);
361    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
362    {
363      int indexClient = std::distance(itbClientHash, itClientHash)-1;
364      {
365        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
366        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
367        ++sendNbIndexBuff[indexClient];
368      }
369    }
370  }
371
372  // Calculate from how many clients each client receive message.
373  // Calculate size of buffer for receiving message
374  std::vector<int> recvRankClient, recvNbIndexClientCount;
375  sendRecvRank(level, sendBuff, sendNbIndexBuff,
376               recvRankClient, recvNbIndexClientCount);
377
378  int recvNbIndexCount = 0;
379  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
380    recvNbIndexCount += recvNbIndexClientCount[idx];
381
382  unsigned long* recvIndexBuff;
383  unsigned char* recvInfoBuff;
384  if (0 != recvNbIndexCount)
385  {
386    recvIndexBuff = new unsigned long[recvNbIndexCount];
387    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
388  }
389
390  // If a client holds information about index and the corresponding which don't belong to it,
391  // it will send a message to the correct clients.
392  // Contents of the message are index and its corresponding informatioin
393  std::vector<MPI_Request> request;
394  int currentIndex = 0;
395  int nbRecvClient = recvRankClient.size();
396  for (int idx = 0; idx < nbRecvClient; ++idx)
397  {
398    if (0 != recvNbIndexClientCount[idx])
399    {
400      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
401      recvInfoFromClients(recvRankClient[idx],
402                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
403                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
404                          commLevel, request);
405    }
406    currentIndex += recvNbIndexClientCount[idx];
407  }
408
409  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
410                                    iteIndex = client2ClientIndex.end();
411  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
412    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
413  std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
414                                          iteInfo = client2ClientInfo.end();
415  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
416    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
417
418  std::vector<MPI_Status> status(request.size());
419  MPI_Waitall(request.size(), &request[0], &status[0]);
420
421  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
422  currentIndex = 0;
423  InfoType infoValue;
424  int infoIndex = 0;
425  unsigned char* infoBuff = recvInfoBuff;
426  for (int idx = 0; idx < nbRecvClient; ++idx)
427  {
428    int count = recvNbIndexClientCount[idx];
429    for (int i = 0; i < count; ++i)
430    {
431      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
432      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue;
433    }
434    currentIndex += count;
435  }
436
437  if (0 != recvNbIndexCount)
438  {
439    delete [] recvIndexBuff;
440    delete [] recvInfoBuff;
441  }
442  for (std::map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
443                                                    it != client2ClientInfo.end(); ++it)
444      delete [] it->second;
445
446  for (std::map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
447                                            it != client2ClientIndex.end(); ++it)
448      delete [] it->second;
449
450  // Ok, now do something recursive
451  if (0 < level)
452  {
453    --level;
454    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
455  }
456  else
457    index2InfoMapping_ = (indexToInfoMapping);
458}
459
460/*!
461  Send message containing index to clients
462  \param [in] clientDestRank rank of destination client
463  \param [in] indices index to send
464  \param [in] indiceSize size of index array to send
465  \param [in] clientIntraComm communication group of client
466  \param [in] requestSendIndex list of sending request
467*/
468template<typename T, typename H>
469void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
470                                                       const MPI_Comm& clientIntraComm,
471                                                       std::vector<MPI_Request>& requestSendIndex)
472{
473  MPI_Request request;
474  requestSendIndex.push_back(request);
475  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
476            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
477}
478
479/*!
480  Receive message containing index to clients
481  \param [in] clientDestRank rank of destination client
482  \param [in] indices index to send
483  \param [in] clientIntraComm communication group of client
484  \param [in] requestRecvIndex list of receiving request
485*/
486template<typename T, typename H>
487void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
488                                                         const MPI_Comm& clientIntraComm,
489                                                         std::vector<MPI_Request>& requestRecvIndex)
490{
491  MPI_Request request;
492  requestRecvIndex.push_back(request);
493  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
494            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
495}
496
497/*!
498  Send message containing information to clients
499  \param [in] clientDestRank rank of destination client
500  \param [in] info info array to send
501  \param [in] infoSize info array size to send
502  \param [in] clientIntraComm communication group of client
503  \param [in] requestSendInfo list of sending request
504*/
505template<typename T, typename H>
506void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
507                                                      const MPI_Comm& clientIntraComm,
508                                                      std::vector<MPI_Request>& requestSendInfo)
509{
510  MPI_Request request;
511  requestSendInfo.push_back(request);
512
513  MPI_Isend(info, infoSize, MPI_CHAR,
514            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
515}
516
517/*!
518  Receive message containing information from other clients
519  \param [in] clientDestRank rank of destination client
520  \param [in] info info array to receive
521  \param [in] infoSize info array size to receive
522  \param [in] clientIntraComm communication group of client
523  \param [in] requestRecvInfo list of sending request
524*/
525template<typename T, typename H>
526void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
527                                                        const MPI_Comm& clientIntraComm,
528                                                        std::vector<MPI_Request>& requestRecvInfo)
529{
530  MPI_Request request;
531  requestRecvInfo.push_back(request);
532
533  MPI_Irecv(info, infoSize, MPI_CHAR,
534            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
535}
536
537/*!
538  Compute how many processes one process needs to send to and from how many processes it will receive.
539  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
540*/
541template<typename T, typename H>
542void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
543{
544  int groupBegin = this->getGroupBegin()[level];
545  int nbInGroup  = this->getNbInGroup()[level];
546  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
547  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
548
549  std::vector<size_t> hashedIndexGroup;
550  computeHashIndex(hashedIndexGroup, nbInGroup);
551  size_t a = hashedIndexGroup[rank-groupBegin];
552  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
553
554  int currentGroup, offset;
555  size_t e,f;
556
557  // Do a simple math [a,b) intersect [c,d)
558  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
559  {
560    std::vector<size_t> hashedIndexGroupParent;
561    int nbInGroupParent = nbInGroupParents[idx];
562    if (0 != nbInGroupParent)
563      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
564    for (int i = 0; i < nbInGroupParent; ++i)
565    {
566      size_t c = hashedIndexGroupParent[i];
567      size_t d = hashedIndexGroupParent[i+1]-1;
568
569    if (!((d < a) || (b <c)))
570        recvRank_[level].push_back(groupParentBegin[idx]+i);
571    }
572
573    offset = rank - groupParentBegin[idx];
574    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
575    {
576      e = hashedIndexGroupParent[offset];
577      f = hashedIndexGroupParent[offset+1]-1;
578    }
579  }
580
581  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
582                                      iteHashGroup = hashedIndexGroup.end();
583  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
584  int begin = std::distance(itbHashGroup, itHashGroup)-1;
585  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
586  int end = std::distance(itbHashGroup, itHashGroup) -1;
587  sendRank_[level].resize(end-begin+1);
588  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
589}
590
591/*!
592  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
593  \param [in] sendNbRank Rank of clients to send to
594  \param [in] sendNbElements Number of elements each client to send to
595  \param [in] receiveNbRank Rank of clients to receive from
596  \param [out] recvNbElements Number of elements each client to send to
597*/
598template<typename T, typename H>
599void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
600                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
601{
602  recvNbElements.resize(recvNbRank.size());
603  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
604  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
605
606  int nRequest = 0;
607  for (int idx = 0; idx < recvNbRank.size(); ++idx)
608  {
609    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
610              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
611    ++nRequest;
612  }
613
614  for (int idx = 0; idx < sendNbRank.size(); ++idx)
615  {
616    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
617              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
618    ++nRequest;
619  }
620
621  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
622}
623
624/*!
625  Send and receive number of process each process need to listen to as well as number
626  of index it will receive during the initalization phase
627  \param [in] level current level
628  \param [in] sendNbRank Rank of clients to send to
629  \param [in] sendNbElements Number of elements each client to send to
630  \param [out] receiveNbRank Rank of clients to receive from
631  \param [out] recvNbElements Number of elements each client to send to
632*/
633template<typename T, typename H>
634void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
635                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
636                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
637{
638  int groupBegin = this->getGroupBegin()[level];
639
640  int offSet = 0;
641  std::vector<int>& sendRank = sendRank_[level];
642  std::vector<int>& recvRank = recvRank_[level];
643  int sendBuffSize = sendRank.size();
644  std::vector<int> sendBuff(sendBuffSize*2);
645  int recvBuffSize = recvRank.size();
646  std::vector<int> recvBuff(recvBuffSize*2,0);
647
648  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
649  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
650
651  int nRequest = 0;
652  for (int idx = 0; idx < recvBuffSize; ++idx)
653  {
654    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
655              recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
656    ++nRequest;
657  }
658
659  for (int idx = 0; idx < sendBuffSize; ++idx)
660  {
661    offSet = sendRank[idx]-groupBegin;
662    sendBuff[idx*2] = sendNbRank[offSet];
663    sendBuff[idx*2+1] = sendNbElements[offSet];
664  }
665
666  for (int idx = 0; idx < sendBuffSize; ++idx)
667  {
668    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
669              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
670    ++nRequest;
671  }
672
673  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
674  int nbRecvRank = 0, nbRecvElements = 0;
675  recvNbRank.clear();
676  recvNbElements.clear();
677  for (int idx = 0; idx < recvBuffSize; ++idx)
678  {
679    if (0 != recvBuff[2*idx])
680    {
681      recvNbRank.push_back(recvRank[idx]);
682      recvNbElements.push_back(recvBuff[2*idx+1]);
683    }
684  }
685}
686
687}
Note: See TracBrowser for help on using the repository browser.