source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 843

Last change on this file since 843 was 843, checked in by mhnguyen, 8 years ago

Several improvements

+) Replace some time-consuming operations by simpler ones

Test
+) On Curie
+) All tests pass

File size: 27.1 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm)
26  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
27{
28  MPI_Comm_size(clientIntraComm, &nbClient_);
29  this->computeMPICommLevel();
30  int nbLvl = this->getNbLevel();
31  sendRank_.resize(nbLvl);
32  recvRank_.resize(nbLvl);
33  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
34}
35
36template<typename T, typename H>
37CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
38{
39}
40
41/*!
42  Compute mapping between indices and information corresponding to these indices
43  \param [in] indices indices a proc has
44*/
45template<typename T, typename H>
46void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
47{
48  int nbLvl = this->getNbLevel();
49  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
50}
51
52/*!
53    Compute mapping between indices and information corresponding to these indices
54for each level of hierarchical DHT. Recursive function
55   \param [in] indices indices a proc has
56   \param [in] commLevel communicator of current level
57   \param [in] level current level
58*/
59template<typename T, typename H>
60void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
61                                                                 const MPI_Comm& commLevel,
62                                                                 int level)
63{
64  int clientRank;
65  MPI_Comm_rank(commLevel,&clientRank);
66  int groupRankBegin = this->getGroupBegin()[level];
67  int nbClient = this->getNbInGroup()[level];
68  std::vector<size_t> hashedIndex;
69  computeHashIndex(hashedIndex, nbClient);
70
71  size_t ssize = indices.numElements(), hashedVal;
72
73  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
74                                      iteClientHash = hashedIndex.end();
75  std::vector<int> sendBuff(nbClient,0);
76  std::vector<int> sendNbIndexBuff(nbClient,0);
77
78  // Number of global index whose mapping server are on other clients
79  int nbIndexToSend = 0;
80  size_t index;
81  HashXIOS<size_t> hashGlobalIndex;
82  for (int i = 0; i < ssize; ++i)
83  {
84    index = indices(i);
85    hashedVal  = hashGlobalIndex(index);
86    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
87    int indexClient = std::distance(itbClientHash, itClientHash)-1;
88    ++sendNbIndexBuff[indexClient];
89  }
90
91  boost::unordered_map<int, size_t* > client2ClientIndex;
92  for (int idx = 0; idx < nbClient; ++idx)
93  {
94    if (0 != sendNbIndexBuff[idx])
95    {
96      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
97      nbIndexToSend += sendNbIndexBuff[idx];
98      sendBuff[idx] = 1;
99      sendNbIndexBuff[idx] = 0;
100    }
101  }
102
103  for (int i = 0; i < ssize; ++i)
104  {
105    index = indices(i);
106    hashedVal  = hashGlobalIndex(index);
107    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
108    {
109      int indexClient = std::distance(itbClientHash, itClientHash)-1;
110      {
111        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
112        ++sendNbIndexBuff[indexClient];
113      }
114    }
115  }
116
117  std::vector<int> recvRankClient, recvNbIndexClientCount;
118  sendRecvRank(level, sendBuff, sendNbIndexBuff,
119               recvRankClient, recvNbIndexClientCount);
120
121  int recvNbIndexCount = 0;
122  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
123    recvNbIndexCount += recvNbIndexClientCount[idx];
124
125  unsigned long* recvIndexBuff;
126  if (0 != recvNbIndexCount)
127    recvIndexBuff = new unsigned long[recvNbIndexCount];
128
129  std::vector<MPI_Request> request;
130  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
131                             iteRecvIndex = recvRankClient.end(),
132                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
133                           itRecvNbIndex;
134  int currentIndex = 0;
135  int nbRecvClient = recvRankClient.size();
136  for (int idx = 0; idx < nbRecvClient; ++idx)
137  {
138    if (0 != recvNbIndexClientCount[idx])
139      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
140    currentIndex += recvNbIndexClientCount[idx];
141  }
142
143  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
144                                                iteIndex = client2ClientIndex.end();
145  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
146    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
147
148  std::vector<MPI_Status> status(request.size());
149  MPI_Waitall(request.size(), &request[0], &status[0]);
150
151  CArray<size_t,1>* tmpGlobalIndex;
152  if (0 != recvNbIndexCount)
153    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
154  else
155    tmpGlobalIndex = new CArray<size_t,1>();
156
157  // OK, we go to the next level and do something recursive
158  if (0 < level)
159  {
160    --level;
161    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
162  }
163  else // Now, we are in the last level where necessary mappings are.
164    indexToInfoMappingLevel_= (index2InfoMapping_);
165
166  typename Index2InfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
167  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
168  currentIndex = 0;
169  for (int idx = 0; idx < nbRecvClient; ++idx)
170  {
171    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
172    {
173      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
174      if (iteIndexToInfoMap != itIndexToInfoMap) ++sendNbIndexOnReturn[idx];
175    }
176    currentIndex += recvNbIndexClientCount[idx];
177  }
178
179  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
180  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
181  int indexIndex = 0;
182  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
183  {
184    recvRankOnReturn[indexIndex] = itIndex->first;
185  }
186  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
187                   recvRankOnReturn, recvNbIndexOnReturn);
188
189  int recvNbIndexCountOnReturn = 0;
190  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
191    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
192
193  unsigned long* recvIndexBuffOnReturn;
194  unsigned char* recvInfoBuffOnReturn;
195  if (0 != recvNbIndexCountOnReturn)
196  {
197    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
198    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
199  }
200
201  std::vector<MPI_Request> requestOnReturn;
202  currentIndex = 0;
203  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
204  {
205    if (0 != recvNbIndexOnReturn[idx])
206    {
207      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
208      recvInfoFromClients(recvRankOnReturn[idx],
209                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
210                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
211                          commLevel, requestOnReturn);
212    }
213    currentIndex += recvNbIndexOnReturn[idx];
214  }
215
216  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
217  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
218  currentIndex = 0;
219  for (int idx = 0; idx < nbRecvClient; ++idx)
220  {
221    if (0 != sendNbIndexOnReturn[idx])
222    {
223      int rank = recvRankClient[idx];
224      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
225      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
226      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
227      int infoIndex = 0;
228      int nb = 0;
229      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
230      {
231        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
232        if (iteIndexToInfoMap != itIndexToInfoMap)
233        {
234          client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
235          ProcessDHTElement<InfoType>::packElement(itIndexToInfoMap->second, tmpInfoPtr, infoIndex);
236          ++nb;
237        }
238      }
239
240      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
241                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
242      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
243                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
244    }
245    currentIndex += recvNbIndexClientCount[idx];
246  }
247
248  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
249  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
250
251  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
252  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
253  int infoIndex = 0;
254  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
255  {
256    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex);
257  }
258
259  indexToInfoMappingLevel_.swap(indexToInfoMapping); //indexToInfoMappingLevel_ = (indexToInfoMapping);
260  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
261  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
262                                                        it != client2ClientIndex.end(); ++it)
263      delete [] it->second;
264  delete tmpGlobalIndex;
265
266  if (0 != recvNbIndexCountOnReturn)
267  {
268    delete [] recvIndexBuffOnReturn;
269    delete [] recvInfoBuffOnReturn;
270  }
271
272  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
273                                                               it != client2ClientInfoOnReturn.end(); ++it)
274      delete [] it->second;
275
276  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
277                                            it != client2ClientIndexOnReturn.end(); ++it)
278      delete [] it->second;
279}
280
281/*!
282  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
283*/
284template<typename T, typename H>
285void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
286{
287  // Compute range of hash index for each client
288  hashedIndex.resize(nbClient+1);
289  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
290  size_t nbHashIndex;
291  hashedIndex[0] = 0;
292  for (int i = 1; i < nbClient; ++i)
293  {
294    nbHashIndex = nbHashIndexMax / nbClient;
295    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
296    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
297  }
298  hashedIndex[nbClient] = nbHashIndexMax;
299}
300
301/*!
302  Compute distribution of global index for servers
303  Each client already holds a piece of information and its associated index.
304This information will be redistributed among processes by projecting indices into size_t space,
305the corresponding information will be also distributed on size_t space.
306After the redistribution, each client holds rearranged index and its corresponding information.
307  \param [in] indexInfoMap index and its corresponding info (usually server index)
308  \param [in] commLevel communicator of current level
309  \param [in] level current level
310*/
311template<typename T, typename H>
312void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
313                                                            const MPI_Comm& commLevel,
314                                                            int level)
315{
316  int clientRank;
317  MPI_Comm_rank(commLevel,&clientRank);
318  computeSendRecvRank(level, clientRank);
319
320  int groupRankBegin = this->getGroupBegin()[level];
321  int nbClient = this->getNbInGroup()[level];
322  std::vector<size_t> hashedIndex;
323  computeHashIndex(hashedIndex, nbClient);
324
325  std::vector<int> sendBuff(nbClient,0);
326  std::vector<int> sendNbIndexBuff(nbClient,0);
327  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
328                                      iteClientHash = hashedIndex.end();
329  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it,
330                                                                 ite = indexInfoMap.end();
331  HashXIOS<size_t> hashGlobalIndex;
332
333  // Compute size of sending and receving buffer
334  for (it = itb; it != ite; ++it)
335  {
336    size_t hashIndex = hashGlobalIndex(it->first);
337    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
338    {
339      int indexClient = std::distance(itbClientHash, itClientHash)-1;
340      {
341        ++sendNbIndexBuff[indexClient];
342      }
343    }
344  }
345
346  boost::unordered_map<int, size_t*> client2ClientIndex;
347  boost::unordered_map<int, unsigned char*> client2ClientInfo;
348  for (int idx = 0; idx < nbClient; ++idx)
349  {
350    if (0 != sendNbIndexBuff[idx])
351    {
352      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
353      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
354      sendNbIndexBuff[idx] = 0;
355      sendBuff[idx] = 1;
356    }
357  }
358
359  std::vector<int> sendNbInfo(nbClient,0);
360  for (it = itb; it != ite; ++it)
361  {
362    size_t hashIndex = hashGlobalIndex(it->first);
363    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
364    {
365      int indexClient = std::distance(itbClientHash, itClientHash)-1;
366      {
367        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
368        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
369        ++sendNbIndexBuff[indexClient];
370      }
371    }
372  }
373
374  // Calculate from how many clients each client receive message.
375  // Calculate size of buffer for receiving message
376  std::vector<int> recvRankClient, recvNbIndexClientCount;
377  sendRecvRank(level, sendBuff, sendNbIndexBuff,
378               recvRankClient, recvNbIndexClientCount);
379
380  int recvNbIndexCount = 0;
381  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
382    recvNbIndexCount += recvNbIndexClientCount[idx];
383
384  unsigned long* recvIndexBuff;
385  unsigned char* recvInfoBuff;
386  if (0 != recvNbIndexCount)
387  {
388    recvIndexBuff = new unsigned long[recvNbIndexCount];
389    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
390  }
391
392  // If a client holds information about index and the corresponding which don't belong to it,
393  // it will send a message to the correct clients.
394  // Contents of the message are index and its corresponding informatioin
395  std::vector<MPI_Request> request;
396  int currentIndex = 0;
397  int nbRecvClient = recvRankClient.size();
398  for (int idx = 0; idx < nbRecvClient; ++idx)
399  {
400    if (0 != recvNbIndexClientCount[idx])
401    {
402      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
403      recvInfoFromClients(recvRankClient[idx],
404                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
405                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
406                          commLevel, request);
407    }
408    currentIndex += recvNbIndexClientCount[idx];
409  }
410
411  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
412                                                iteIndex = client2ClientIndex.end();
413  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
414    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
415  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
416                                                      iteInfo = client2ClientInfo.end();
417  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
418    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
419
420  std::vector<MPI_Status> status(request.size());
421  MPI_Waitall(request.size(), &request[0], &status[0]);
422
423  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
424  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
425  currentIndex = 0;
426  InfoType infoValue;
427  int infoIndex = 0;
428  unsigned char* infoBuff = recvInfoBuff;
429  for (int idx = 0; idx < nbRecvClient; ++idx)
430  {
431    int count = recvNbIndexClientCount[idx];
432    for (int i = 0; i < count; ++i)
433    {
434      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
435      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue;
436    }
437    currentIndex += count;
438  }
439
440  if (0 != recvNbIndexCount)
441  {
442    delete [] recvIndexBuff;
443    delete [] recvInfoBuff;
444  }
445  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
446                                                               it != client2ClientInfo.end(); ++it)
447      delete [] it->second;
448
449  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
450                                                        it != client2ClientIndex.end(); ++it)
451      delete [] it->second;
452
453  // Ok, now do something recursive
454  if (0 < level)
455  {
456    --level;
457    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
458  }
459  else
460    index2InfoMapping_.swap(indexToInfoMapping); //index2InfoMapping_ = (indexToInfoMapping);
461}
462
463/*!
464  Send message containing index to clients
465  \param [in] clientDestRank rank of destination client
466  \param [in] indices index to send
467  \param [in] indiceSize size of index array to send
468  \param [in] clientIntraComm communication group of client
469  \param [in] requestSendIndex list of sending request
470*/
471template<typename T, typename H>
472void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
473                                                       const MPI_Comm& clientIntraComm,
474                                                       std::vector<MPI_Request>& requestSendIndex)
475{
476  MPI_Request request;
477  requestSendIndex.push_back(request);
478  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
479            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
480}
481
482/*!
483  Receive message containing index to clients
484  \param [in] clientDestRank rank of destination client
485  \param [in] indices index to send
486  \param [in] clientIntraComm communication group of client
487  \param [in] requestRecvIndex list of receiving request
488*/
489template<typename T, typename H>
490void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
491                                                         const MPI_Comm& clientIntraComm,
492                                                         std::vector<MPI_Request>& requestRecvIndex)
493{
494  MPI_Request request;
495  requestRecvIndex.push_back(request);
496  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
497            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
498}
499
500/*!
501  Send message containing information to clients
502  \param [in] clientDestRank rank of destination client
503  \param [in] info info array to send
504  \param [in] infoSize info array size to send
505  \param [in] clientIntraComm communication group of client
506  \param [in] requestSendInfo list of sending request
507*/
508template<typename T, typename H>
509void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
510                                                      const MPI_Comm& clientIntraComm,
511                                                      std::vector<MPI_Request>& requestSendInfo)
512{
513  MPI_Request request;
514  requestSendInfo.push_back(request);
515
516  MPI_Isend(info, infoSize, MPI_CHAR,
517            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
518}
519
520/*!
521  Receive message containing information from other clients
522  \param [in] clientDestRank rank of destination client
523  \param [in] info info array to receive
524  \param [in] infoSize info array size to receive
525  \param [in] clientIntraComm communication group of client
526  \param [in] requestRecvInfo list of sending request
527*/
528template<typename T, typename H>
529void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
530                                                        const MPI_Comm& clientIntraComm,
531                                                        std::vector<MPI_Request>& requestRecvInfo)
532{
533  MPI_Request request;
534  requestRecvInfo.push_back(request);
535
536  MPI_Irecv(info, infoSize, MPI_CHAR,
537            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
538}
539
540/*!
541  Compute how many processes one process needs to send to and from how many processes it will receive.
542  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
543*/
544template<typename T, typename H>
545void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
546{
547  int groupBegin = this->getGroupBegin()[level];
548  int nbInGroup  = this->getNbInGroup()[level];
549  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
550  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
551
552  std::vector<size_t> hashedIndexGroup;
553  computeHashIndex(hashedIndexGroup, nbInGroup);
554  size_t a = hashedIndexGroup[rank-groupBegin];
555  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
556
557  int currentGroup, offset;
558  size_t e,f;
559
560  // Do a simple math [a,b) intersect [c,d)
561  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
562  {
563    std::vector<size_t> hashedIndexGroupParent;
564    int nbInGroupParent = nbInGroupParents[idx];
565    if (0 != nbInGroupParent)
566      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
567    for (int i = 0; i < nbInGroupParent; ++i)
568    {
569      size_t c = hashedIndexGroupParent[i];
570      size_t d = hashedIndexGroupParent[i+1]-1;
571
572    if (!((d < a) || (b <c)))
573        recvRank_[level].push_back(groupParentBegin[idx]+i);
574    }
575
576    offset = rank - groupParentBegin[idx];
577    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
578    {
579      e = hashedIndexGroupParent[offset];
580      f = hashedIndexGroupParent[offset+1]-1;
581    }
582  }
583
584  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
585                                      iteHashGroup = hashedIndexGroup.end();
586  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
587  int begin = std::distance(itbHashGroup, itHashGroup)-1;
588  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
589  int end = std::distance(itbHashGroup, itHashGroup) -1;
590  sendRank_[level].resize(end-begin+1);
591  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
592}
593
594/*!
595  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
596  \param [in] sendNbRank Rank of clients to send to
597  \param [in] sendNbElements Number of elements each client to send to
598  \param [in] receiveNbRank Rank of clients to receive from
599  \param [out] recvNbElements Number of elements each client to send to
600*/
601template<typename T, typename H>
602void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
603                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
604{
605  recvNbElements.resize(recvNbRank.size());
606  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
607  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
608
609  int nRequest = 0;
610  for (int idx = 0; idx < recvNbRank.size(); ++idx)
611  {
612    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
613              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
614    ++nRequest;
615  }
616
617  for (int idx = 0; idx < sendNbRank.size(); ++idx)
618  {
619    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
620              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
621    ++nRequest;
622  }
623
624  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
625}
626
627/*!
628  Send and receive number of process each process need to listen to as well as number
629  of index it will receive during the initalization phase
630  \param [in] level current level
631  \param [in] sendNbRank Rank of clients to send to
632  \param [in] sendNbElements Number of elements each client to send to
633  \param [out] receiveNbRank Rank of clients to receive from
634  \param [out] recvNbElements Number of elements each client to send to
635*/
636template<typename T, typename H>
637void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
638                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
639                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
640{
641  int groupBegin = this->getGroupBegin()[level];
642
643  int offSet = 0;
644  std::vector<int>& sendRank = sendRank_[level];
645  std::vector<int>& recvRank = recvRank_[level];
646  int sendBuffSize = sendRank.size();
647  std::vector<int> sendBuff(sendBuffSize*2);
648  int recvBuffSize = recvRank.size();
649  std::vector<int> recvBuff(recvBuffSize*2,0);
650
651  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
652  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
653
654  int nRequest = 0;
655  for (int idx = 0; idx < recvBuffSize; ++idx)
656  {
657    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
658              recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
659    ++nRequest;
660  }
661
662  for (int idx = 0; idx < sendBuffSize; ++idx)
663  {
664    offSet = sendRank[idx]-groupBegin;
665    sendBuff[idx*2] = sendNbRank[offSet];
666    sendBuff[idx*2+1] = sendNbElements[offSet];
667  }
668
669  for (int idx = 0; idx < sendBuffSize; ++idx)
670  {
671    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
672              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
673    ++nRequest;
674  }
675
676  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
677  int nbRecvRank = 0, nbRecvElements = 0;
678  recvNbRank.clear();
679  recvNbElements.clear();
680  for (int idx = 0; idx < recvBuffSize; ++idx)
681  {
682    if (0 != recvBuff[2*idx])
683    {
684      recvNbRank.push_back(recvRank[idx]);
685      recvNbElements.push_back(recvBuff[2*idx+1]);
686    }
687  }
688}
689
690}
Note: See TracBrowser for help on using the repository browser.