source: XIOS/dev/dev_olga/src/client_client_dht_template_impl.hpp @ 1077

Last change on this file since 1077 was 1030, checked in by oabramkina, 7 years ago

dev: intermediate commit.

File size: 29.3 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15template<typename T, typename H>
16CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const MPI_Comm& clientIntraComm)
17  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
18{
19  MPI_Comm_size(clientIntraComm, &nbClient_);
20  this->computeMPICommLevel();
21  int nbLvl = this->getNbLevel();
22  sendRank_.resize(nbLvl);
23  recvRank_.resize(nbLvl);
24}
25
26/*!
27  Constructor with initial distribution information and the corresponding index
28  Each client (process) holds a piece of information as well as the attached index, the index
29will be redistributed (projected) into size_t space as long as the associated information.
30  \param [in] indexInfoMap initial index and information mapping
31  \param [in] clientIntraComm communicator of clients
32  \param [in] hierarLvl level of hierarchy
33*/
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
36                                                        const MPI_Comm& clientIntraComm)
37  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
38{
39  MPI_Comm_size(clientIntraComm, &nbClient_);
40  this->computeMPICommLevel();
41  int nbLvl = this->getNbLevel();
42  sendRank_.resize(nbLvl);
43  recvRank_.resize(nbLvl);
44  Index2VectorInfoTypeMap indexToVecInfoMap;
45  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
46  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
47  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
48  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
49}
50
51/*!
52  Constructor with initial distribution information and the corresponding index
53  Each client (process) holds a piece of information as well as the attached index, the index
54will be redistributed (projected) into size_t space as long as the associated information.
55  \param [in] indexInfoMap initial index and information mapping
56  \param [in] clientIntraComm communicator of clients
57  \param [in] hierarLvl level of hierarchy
58*/
59template<typename T, typename H>
60CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
61                                                        const MPI_Comm& clientIntraComm)
62  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
63{
64  MPI_Comm_size(clientIntraComm, &nbClient_);
65  this->computeMPICommLevel();
66  int nbLvl = this->getNbLevel();
67  sendRank_.resize(nbLvl);
68  recvRank_.resize(nbLvl);
69  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
70}
71
72template<typename T, typename H>
73CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
74{
75}
76
77/*!
78  Compute mapping between indices and information corresponding to these indices
79  \param [in] indices indices a proc has
80*/
81template<typename T, typename H>
82void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
83{
84  int nbLvl = this->getNbLevel();
85  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
86}
87
88/*!
89    Compute mapping between indices and information corresponding to these indices
90for each level of hierarchical DHT. Recursive function
91   \param [in] indices indices a proc has
92   \param [in] commLevel communicator of current level
93   \param [in] level current level
94*/
95template<typename T, typename H>
96void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
97                                                                 const MPI_Comm& commLevel,
98                                                                 int level)
99{
100  int clientRank;
101  MPI_Comm_rank(commLevel,&clientRank);
102  int groupRankBegin = this->getGroupBegin()[level];
103  int nbClient = this->getNbInGroup()[level];
104  std::vector<size_t> hashedIndex;
105  computeHashIndex(hashedIndex, nbClient);
106
107  size_t ssize = indices.numElements(), hashedVal;
108
109  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
110                                      iteClientHash = hashedIndex.end();
111  std::vector<int> sendBuff(nbClient,0);
112  std::vector<int> sendNbIndexBuff(nbClient,0);
113
114  // Number of global index whose mapping server are on other clients
115  int nbIndexToSend = 0;
116  size_t index;
117  HashXIOS<size_t> hashGlobalIndex;
118  boost::unordered_map<size_t,int> nbIndices;
119  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
120  for (int i = 0; i < ssize; ++i)
121  {
122    index = indices(i);
123    if (0 == nbIndices.count(index))
124    {
125      hashedVal  = hashGlobalIndex(index);
126      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
127      int indexClient = std::distance(itbClientHash, itClientHash)-1;
128      ++sendNbIndexBuff[indexClient];
129      nbIndices[index] = 1;
130    }
131  }
132
133  boost::unordered_map<int, size_t* > client2ClientIndex;
134  for (int idx = 0; idx < nbClient; ++idx)
135  {
136    if (0 != sendNbIndexBuff[idx])
137    {
138      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
139      nbIndexToSend += sendNbIndexBuff[idx];
140      sendBuff[idx] = 1;
141      sendNbIndexBuff[idx] = 0;
142    }
143  }
144
145  for (int i = 0; i < ssize; ++i)
146  {
147    index = indices(i);
148    if (1 == nbIndices[index])
149    {
150      hashedVal  = hashGlobalIndex(index);
151      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
152      int indexClient = std::distance(itbClientHash, itClientHash)-1;
153      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
154      ++sendNbIndexBuff[indexClient];
155      ++nbIndices[index];
156    }
157  }
158
159  std::vector<int> recvRankClient, recvNbIndexClientCount;
160  sendRecvRank(level, sendBuff, sendNbIndexBuff,
161               recvRankClient, recvNbIndexClientCount);
162
163  int recvNbIndexCount = 0;
164  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
165    recvNbIndexCount += recvNbIndexClientCount[idx];
166
167  unsigned long* recvIndexBuff;
168  if (0 != recvNbIndexCount)
169    recvIndexBuff = new unsigned long[recvNbIndexCount];
170
171  std::vector<MPI_Request> request;
172  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
173                             iteRecvIndex = recvRankClient.end(),
174                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
175                           itRecvNbIndex;
176  int currentIndex = 0;
177  int nbRecvClient = recvRankClient.size();
178  for (int idx = 0; idx < nbRecvClient; ++idx)
179  {
180    if (0 != recvNbIndexClientCount[idx])
181      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
182    currentIndex += recvNbIndexClientCount[idx];
183  }
184
185  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
186                                                iteIndex = client2ClientIndex.end();
187  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
188    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
189
190  std::vector<MPI_Status> status(request.size());
191  MPI_Waitall(request.size(), &request[0], &status[0]);
192
193  CArray<size_t,1>* tmpGlobalIndex;
194  if (0 != recvNbIndexCount)
195    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
196  else
197    tmpGlobalIndex = new CArray<size_t,1>();
198
199  // OK, we go to the next level and do something recursive
200  if (0 < level)
201  {
202    --level;
203    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
204  }
205  else // Now, we are in the last level where necessary mappings are.
206    indexToInfoMappingLevel_= (index2InfoMapping_);
207
208  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
209  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
210  currentIndex = 0;
211  for (int idx = 0; idx < nbRecvClient; ++idx)
212  {
213    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
214    {
215      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
216      if (iteIndexToInfoMap != itIndexToInfoMap)
217        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
218    }
219    currentIndex += recvNbIndexClientCount[idx];
220  }
221
222  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
223  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
224  int indexIndex = 0;
225  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
226  {
227    recvRankOnReturn[indexIndex] = itIndex->first;
228  }
229  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
230                   recvRankOnReturn, recvNbIndexOnReturn);
231
232  int recvNbIndexCountOnReturn = 0;
233  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
234    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
235
236  unsigned long* recvIndexBuffOnReturn;
237  unsigned char* recvInfoBuffOnReturn;
238  if (0 != recvNbIndexCountOnReturn)
239  {
240    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
241    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
242  }
243
244  std::vector<MPI_Request> requestOnReturn;
245  currentIndex = 0;
246  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
247  {
248    if (0 != recvNbIndexOnReturn[idx])
249    {
250      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
251      recvInfoFromClients(recvRankOnReturn[idx],
252                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
253                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
254                          commLevel, requestOnReturn);
255    }
256    currentIndex += recvNbIndexOnReturn[idx];
257  }
258
259  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
260  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
261  currentIndex = 0;
262  for (int idx = 0; idx < nbRecvClient; ++idx)
263  {
264    if (0 != sendNbIndexOnReturn[idx])
265    {
266      int rank = recvRankClient[idx];
267      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
268      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
269      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
270      int infoIndex = 0;
271      int nb = 0;
272      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
273      {
274        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
275        if (iteIndexToInfoMap != itIndexToInfoMap)
276        {
277          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
278          for (int k = 0; k < infoTmp.size(); ++k)
279          {
280            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
281            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
282            ++nb;
283          }
284        }
285      }
286
287      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
288                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
289      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
290                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
291    }
292    currentIndex += recvNbIndexClientCount[idx];
293  }
294
295  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
296  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
297
298  Index2VectorInfoTypeMap indexToInfoMapping;
299  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
300  int infoIndex = 0;
301  InfoType unpackedInfo;
302  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
303  {
304    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
305    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
306  }
307
308  indexToInfoMappingLevel_.swap(indexToInfoMapping);
309
310  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
311  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
312                                                        it != client2ClientIndex.end(); ++it)
313      delete [] it->second;
314  delete tmpGlobalIndex;
315
316  if (0 != recvNbIndexCountOnReturn)
317  {
318    delete [] recvIndexBuffOnReturn;
319    delete [] recvInfoBuffOnReturn;
320  }
321
322  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
323                                                               it != client2ClientInfoOnReturn.end(); ++it)
324      delete [] it->second;
325
326  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
327                                            it != client2ClientIndexOnReturn.end(); ++it)
328      delete [] it->second;
329}
330
331/*!
332  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
333*/
334template<typename T, typename H>
335void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
336{
337  // Compute range of hash index for each client
338  hashedIndex.resize(nbClient+1);
339  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
340  size_t nbHashIndex;
341  hashedIndex[0] = 0;
342  for (int i = 1; i < nbClient; ++i)
343  {
344    nbHashIndex = nbHashIndexMax / nbClient;
345    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
346    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
347  }
348  hashedIndex[nbClient] = nbHashIndexMax;
349}
350
351/*!
352  Compute distribution of global index for servers
353  Each client already holds a piece of information and its associated index.
354This information will be redistributed among processes by projecting indices into size_t space,
355the corresponding information will be also distributed on size_t space.
356After the redistribution, each client holds rearranged index and its corresponding information.
357  \param [in] indexInfoMap index and its corresponding info (usually server index)
358  \param [in] commLevel communicator of current level
359  \param [in] level current level
360*/
361template<typename T, typename H>
362void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
363                                                            const MPI_Comm& commLevel,
364                                                            int level)
365{
366  int clientRank;
367  MPI_Comm_rank(commLevel,&clientRank);
368  computeSendRecvRank(level, clientRank);
369
370  int groupRankBegin = this->getGroupBegin()[level];
371  int nbClient = this->getNbInGroup()[level];
372  std::vector<size_t> hashedIndex;
373  computeHashIndex(hashedIndex, nbClient);
374
375  std::vector<int> sendBuff(nbClient,0);
376  std::vector<int> sendNbIndexBuff(nbClient,0);
377  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
378                                      iteClientHash = hashedIndex.end();
379  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
380                                                   ite = indexInfoMap.end();
381  HashXIOS<size_t> hashGlobalIndex;
382
383  // Compute size of sending and receving buffer
384  for (it = itb; it != ite; ++it)
385  {
386    size_t hashIndex = hashGlobalIndex(it->first);
387    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
388    int indexClient = std::distance(itbClientHash, itClientHash)-1;
389    sendNbIndexBuff[indexClient] += it->second.size();
390  }
391
392  boost::unordered_map<int, size_t*> client2ClientIndex;
393  boost::unordered_map<int, unsigned char*> client2ClientInfo;
394  for (int idx = 0; idx < nbClient; ++idx)
395  {
396    if (0 != sendNbIndexBuff[idx])
397    {
398      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
399      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
400      sendNbIndexBuff[idx] = 0;
401      sendBuff[idx] = 1;
402    }
403  }
404
405  std::vector<int> sendNbInfo(nbClient,0);
406  for (it = itb; it != ite; ++it)
407  {
408    const std::vector<InfoType>& infoTmp = it->second;
409    size_t hashIndex = hashGlobalIndex(it->first);
410    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
411    int indexClient = std::distance(itbClientHash, itClientHash)-1;
412    for (int idx = 0; idx < infoTmp.size(); ++idx)
413    {
414      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
415  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
416      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
417      ++sendNbIndexBuff[indexClient];
418    }
419  }
420
421  // Calculate from how many clients each client receive message.
422  // Calculate size of buffer for receiving message
423  std::vector<int> recvRankClient, recvNbIndexClientCount;
424  sendRecvRank(level, sendBuff, sendNbIndexBuff,
425               recvRankClient, recvNbIndexClientCount);
426
427  int recvNbIndexCount = 0;
428  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
429    recvNbIndexCount += recvNbIndexClientCount[idx];
430
431  unsigned long* recvIndexBuff;
432  unsigned char* recvInfoBuff;
433  if (0 != recvNbIndexCount)
434  {
435    recvIndexBuff = new unsigned long[recvNbIndexCount];
436    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
437  }
438
439  // If a client holds information about index and the corresponding which don't belong to it,
440  // it will send a message to the correct clients.
441  // Contents of the message are index and its corresponding informatioin
442  std::vector<MPI_Request> request;
443  int currentIndex = 0;
444  int nbRecvClient = recvRankClient.size();
445  for (int idx = 0; idx < nbRecvClient; ++idx)
446  {
447    if (0 != recvNbIndexClientCount[idx])
448    {
449      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
450      recvInfoFromClients(recvRankClient[idx],
451                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
452                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
453                          commLevel, request);
454    }
455    currentIndex += recvNbIndexClientCount[idx];
456  }
457
458  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
459                                                iteIndex = client2ClientIndex.end();
460  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
461    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
462  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
463                                                      iteInfo = client2ClientInfo.end();
464  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
465    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
466
467  std::vector<MPI_Status> status(request.size());
468  MPI_Waitall(request.size(), &request[0], &status[0]);
469
470  Index2VectorInfoTypeMap indexToInfoMapping;
471  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
472  currentIndex = 0;
473  InfoType infoValue;
474  int infoIndex = 0;
475  unsigned char* infoBuff = recvInfoBuff;
476  for (int idx = 0; idx < nbRecvClient; ++idx)
477  {
478    size_t index;
479    int count = recvNbIndexClientCount[idx];
480    for (int i = 0; i < count; ++i)
481    {
482      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
483      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
484    }
485    currentIndex += count;
486  }
487
488  if (0 != recvNbIndexCount)
489  {
490    delete [] recvIndexBuff;
491    delete [] recvInfoBuff;
492  }
493  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
494                                                               it != client2ClientInfo.end(); ++it)
495      delete [] it->second;
496
497  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
498                                                        it != client2ClientIndex.end(); ++it)
499      delete [] it->second;
500
501  // Ok, now do something recursive
502  if (0 < level)
503  {
504    --level;
505    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
506  }
507  else
508    index2InfoMapping_.swap(indexToInfoMapping);
509}
510
511/*!
512  Send message containing index to clients
513  \param [in] clientDestRank rank of destination client
514  \param [in] indices index to send
515  \param [in] indiceSize size of index array to send
516  \param [in] clientIntraComm communication group of client
517  \param [in] requestSendIndex list of sending request
518*/
519template<typename T, typename H>
520void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
521                                                       const MPI_Comm& clientIntraComm,
522                                                       std::vector<MPI_Request>& requestSendIndex)
523{
524  MPI_Request request;
525  requestSendIndex.push_back(request);
526  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
527            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
528}
529
530/*!
531  Receive message containing index to clients
532  \param [in] clientDestRank rank of destination client
533  \param [in] indices index to send
534  \param [in] clientIntraComm communication group of client
535  \param [in] requestRecvIndex list of receiving request
536*/
537template<typename T, typename H>
538void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
539                                                         const MPI_Comm& clientIntraComm,
540                                                         std::vector<MPI_Request>& requestRecvIndex)
541{
542  MPI_Request request;
543  requestRecvIndex.push_back(request);
544  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
545            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
546}
547
548/*!
549  Send message containing information to clients
550  \param [in] clientDestRank rank of destination client
551  \param [in] info info array to send
552  \param [in] infoSize info array size to send
553  \param [in] clientIntraComm communication group of client
554  \param [in] requestSendInfo list of sending request
555*/
556template<typename T, typename H>
557void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
558                                                      const MPI_Comm& clientIntraComm,
559                                                      std::vector<MPI_Request>& requestSendInfo)
560{
561  MPI_Request request;
562  requestSendInfo.push_back(request);
563
564  MPI_Isend(info, infoSize, MPI_CHAR,
565            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
566}
567
568/*!
569  Receive message containing information from other clients
570  \param [in] clientDestRank rank of destination client
571  \param [in] info info array to receive
572  \param [in] infoSize info array size to receive
573  \param [in] clientIntraComm communication group of client
574  \param [in] requestRecvInfo list of sending request
575*/
576template<typename T, typename H>
577void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
578                                                        const MPI_Comm& clientIntraComm,
579                                                        std::vector<MPI_Request>& requestRecvInfo)
580{
581  MPI_Request request;
582  requestRecvInfo.push_back(request);
583
584  MPI_Irecv(info, infoSize, MPI_CHAR,
585            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
586}
587
588/*!
589  Compute how many processes one process needs to send to and from how many processes it will receive.
590  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
591*/
592template<typename T, typename H>
593void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
594{
595  int groupBegin = this->getGroupBegin()[level];
596  int nbInGroup  = this->getNbInGroup()[level];
597  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
598  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
599
600  std::vector<size_t> hashedIndexGroup;
601  computeHashIndex(hashedIndexGroup, nbInGroup);
602  size_t a = hashedIndexGroup[rank-groupBegin];
603  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
604
605  int currentGroup, offset;
606  size_t e,f;
607
608  // Do a simple math [a,b) intersect [c,d)
609  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
610  {
611    std::vector<size_t> hashedIndexGroupParent;
612    int nbInGroupParent = nbInGroupParents[idx];
613    if (0 != nbInGroupParent)
614      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
615    for (int i = 0; i < nbInGroupParent; ++i)
616    {
617      size_t c = hashedIndexGroupParent[i];
618      size_t d = hashedIndexGroupParent[i+1]-1;
619
620    if (!((d < a) || (b <c)))
621        recvRank_[level].push_back(groupParentBegin[idx]+i);
622    }
623
624    offset = rank - groupParentBegin[idx];
625    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
626    {
627      e = hashedIndexGroupParent[offset];
628      f = hashedIndexGroupParent[offset+1]-1;
629    }
630  }
631
632  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
633                                      iteHashGroup = hashedIndexGroup.end();
634  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
635  int begin = std::distance(itbHashGroup, itHashGroup)-1;
636  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
637  int end = std::distance(itbHashGroup, itHashGroup) -1;
638  sendRank_[level].resize(end-begin+1);
639  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
640}
641
642/*!
643  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
644  \param [in] sendNbRank Rank of clients to send to
645  \param [in] sendNbElements Number of elements each client to send to
646  \param [in] receiveNbRank Rank of clients to receive from
647  \param [out] recvNbElements Number of elements each client to send to
648*/
649template<typename T, typename H>
650void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
651                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
652{
653  recvNbElements.resize(recvNbRank.size());
654  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
655  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
656
657  int nRequest = 0;
658  for (int idx = 0; idx < recvNbRank.size(); ++idx)
659  {
660    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
661              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
662    ++nRequest;
663  }
664
665  for (int idx = 0; idx < sendNbRank.size(); ++idx)
666  {
667    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
668              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
669    ++nRequest;
670  }
671
672  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
673}
674
675/*!
676  Send and receive number of process each process need to listen to as well as number
677  of index it will receive during the initalization phase
678  \param [in] level current level
679  \param [in] sendNbRank Rank of clients to send to
680  \param [in] sendNbElements Number of elements each client to send to
681  \param [out] receiveNbRank Rank of clients to receive from
682  \param [out] recvNbElements Number of elements each client to send to
683*/
684template<typename T, typename H>
685void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
686                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
687                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
688{
689  int groupBegin = this->getGroupBegin()[level];
690
691  int offSet = 0;
692  std::vector<int>& sendRank = sendRank_[level];
693  std::vector<int>& recvRank = recvRank_[level];
694  int sendBuffSize = sendRank.size();
695  std::vector<int> sendBuff(sendBuffSize*2);
696  int recvBuffSize = recvRank.size();
697  std::vector<int> recvBuff(recvBuffSize*2,0);
698
699  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
700  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
701
702  int nRequest = 0;
703  for (int idx = 0; idx < recvBuffSize; ++idx)
704  {
705    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
706              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
707    ++nRequest;
708  }
709
710  for (int idx = 0; idx < sendBuffSize; ++idx)
711  {
712    offSet = sendRank[idx]-groupBegin;
713    sendBuff[idx*2] = sendNbRank[offSet];
714    sendBuff[idx*2+1] = sendNbElements[offSet];
715  }
716
717  for (int idx = 0; idx < sendBuffSize; ++idx)
718  {
719    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
720              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
721    ++nRequest;
722  }
723
724  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
725  int nbRecvRank = 0, nbRecvElements = 0;
726  recvNbRank.clear();
727  recvNbElements.clear();
728  for (int idx = 0; idx < recvBuffSize; ++idx)
729  {
730    if (0 != recvBuff[2*idx])
731    {
732      recvNbRank.push_back(recvRank[idx]);
733      recvNbElements.push_back(recvBuff[2*idx+1]);
734    }
735  }
736}
737
738}
Note: See TracBrowser for help on using the repository browser.