source: XIOS/dev/branch_openmp/src/client_client_dht_template_impl.hpp @ 1545

Last change on this file since 1545 was 1545, checked in by yushan, 6 years ago

branch_openmp merged with trunk r1544

File size: 33.7 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15template<typename T, typename H>
16CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
17  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
18{
19  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
20  this->computeMPICommLevel();
21  int nbLvl = this->getNbLevel();
22  sendRank_.resize(nbLvl);
23  recvRank_.resize(nbLvl);
24}
25
26/*!
27  Constructor with initial distribution information and the corresponding index
28  Each client (process) holds a piece of information as well as the attached index, the index
29will be redistributed (projected) into size_t space as long as the associated information.
30  \param [in] indexInfoMap initial index and information mapping
31  \param [in] clientIntraComm communicator of clients
32  \param [in] hierarLvl level of hierarchy
33*/
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
36                                                        const ep_lib::MPI_Comm& clientIntraComm)
37  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
38{
39  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
40  this->computeMPICommLevel();
41  int nbLvl = this->getNbLevel();
42  sendRank_.resize(nbLvl);
43  recvRank_.resize(nbLvl);
44  Index2VectorInfoTypeMap indexToVecInfoMap;
45  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
46  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
47  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
48  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
49}
50
51/*!
52  Constructor with initial distribution information and the corresponding index
53  Each client (process) holds a piece of information as well as the attached index, the index
54will be redistributed (projected) into size_t space as long as the associated information.
55  \param [in] indexInfoMap initial index and information mapping
56  \param [in] clientIntraComm communicator of clients
57  \param [in] hierarLvl level of hierarchy
58*/
59template<typename T, typename H>
60CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
61                                                        const ep_lib::MPI_Comm& clientIntraComm)
62  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
63{
64  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
65  this->computeMPICommLevel();
66  int nbLvl = this->getNbLevel();
67  sendRank_.resize(nbLvl);
68  recvRank_.resize(nbLvl);
69  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
70}
71
72template<typename T, typename H>
73CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
74{
75}
76
77/*!
78  Compute mapping between indices and information corresponding to these indices
79  \param [in] indices indices a proc has
80*/
81template<typename T, typename H>
82void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
83{
84  int nbLvl = this->getNbLevel();
85  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
86}
87
88/*!
89    Compute mapping between indices and information corresponding to these indices
90for each level of hierarchical DHT. Recursive function
91   \param [in] indices indices a proc has
92   \param [in] commLevel communicator of current level
93   \param [in] level current level
94*/
95template<typename T, typename H>
96void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
97                                                                 const ep_lib::MPI_Comm& commLevel,
98                                                                 int level)
99{
100  int clientRank;
101  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
102  int groupRankBegin = this->getGroupBegin()[level];
103  int nbClient = this->getNbInGroup()[level];
104  std::vector<size_t> hashedIndex;
105  computeHashIndex(hashedIndex, nbClient);
106
107  size_t ssize = indices.numElements(), hashedVal;
108
109  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
110                                      iteClientHash = hashedIndex.end();
111  std::vector<int> sendBuff(nbClient,0);
112  std::vector<int> sendNbIndexBuff(nbClient,0);
113
114  // Number of global index whose mapping server are on other clients
115  int nbIndexToSend = 0;
116  size_t index;
117  HashXIOS<size_t> hashGlobalIndex;
118  std::unordered_map<size_t,int> nbIndices;
119  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
120  for (int i = 0; i < ssize; ++i)
121  {
122    index = indices(i);
123    if (0 == nbIndices.count(index))
124    {
125      hashedVal  = hashGlobalIndex(index);
126      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
127      int indexClient = std::distance(itbClientHash, itClientHash)-1;
128      ++sendNbIndexBuff[indexClient];
129      nbIndices[index] = 1;
130    }
131  }
132
133  std::unordered_map<int, size_t* > client2ClientIndex;
134  for (int idx = 0; idx < nbClient; ++idx)
135  {
136    if (0 != sendNbIndexBuff[idx])
137    {
138      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
139      nbIndexToSend += sendNbIndexBuff[idx];
140      sendBuff[idx] = 1;
141      sendNbIndexBuff[idx] = 0;
142    }
143  }
144
145  for (int i = 0; i < ssize; ++i)
146  {
147    index = indices(i);
148    if (1 == nbIndices[index])
149    {
150      hashedVal  = hashGlobalIndex(index);
151      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
152      int indexClient = std::distance(itbClientHash, itClientHash)-1;
153      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
154      ++sendNbIndexBuff[indexClient];
155      ++nbIndices[index];
156    }
157  }
158
159  std::vector<int> recvRankClient, recvNbIndexClientCount;
160  sendRecvRank(level, sendBuff, sendNbIndexBuff,
161               recvRankClient, recvNbIndexClientCount);
162
163  int recvNbIndexCount = 0;
164  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
165    recvNbIndexCount += recvNbIndexClientCount[idx];
166
167  unsigned long* recvIndexBuff;
168  if (0 != recvNbIndexCount)
169    recvIndexBuff = new unsigned long[recvNbIndexCount];
170
171int request_size = 0;
172  for (int idx = 0; idx < recvRankClient.size(); ++idx)
173  {
174    if (0 != recvNbIndexClientCount[idx])
175      request_size ++;
176  }
177
178  request_size += client2ClientIndex.size();
179
180  std::vector<ep_lib::MPI_Request> request(request_size);
181 
182  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
183                             iteRecvIndex = recvRankClient.end(),
184                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
185                           itRecvNbIndex;
186  int currentIndex = 0;
187  int nbRecvClient = recvRankClient.size();
188  int request_position = 0;
189  for (int idx = 0; idx < nbRecvClient; ++idx)
190  {
191    if (0 != recvNbIndexClientCount[idx])
192      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
193    currentIndex += recvNbIndexClientCount[idx];
194  }
195
196  std::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
197                                                iteIndex = client2ClientIndex.end();
198  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
199    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
200
201  std::vector<ep_lib::MPI_Status> status(request.size());
202  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
203
204  CArray<size_t,1>* tmpGlobalIndex;
205  if (0 != recvNbIndexCount)
206    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
207  else
208    tmpGlobalIndex = new CArray<size_t,1>();
209
210  // OK, we go to the next level and do something recursive
211  if (0 < level)
212  {
213    --level;
214    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
215  }
216  else // Now, we are in the last level where necessary mappings are.
217    indexToInfoMappingLevel_= (index2InfoMapping_);
218
219  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
220  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
221  currentIndex = 0;
222  for (int idx = 0; idx < nbRecvClient; ++idx)
223  {
224    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
225    {
226      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
227      if (iteIndexToInfoMap != itIndexToInfoMap)
228        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
229    }
230    currentIndex += recvNbIndexClientCount[idx];
231  }
232
233  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
234  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
235  int indexIndex = 0;
236  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
237  {
238    recvRankOnReturn[indexIndex] = itIndex->first;
239  }
240  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
241                   recvRankOnReturn, recvNbIndexOnReturn);
242
243  int recvNbIndexCountOnReturn = 0;
244  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
245    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
246
247  unsigned long* recvIndexBuffOnReturn;
248  unsigned char* recvInfoBuffOnReturn;
249  if (0 != recvNbIndexCountOnReturn)
250  {
251    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
252    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
253  }
254
255int requestOnReturn_size=0;
256  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
257  {
258    if (0 != recvNbIndexOnReturn[idx])
259    {
260      requestOnReturn_size += 2;
261    }
262  }
263
264  for (int idx = 0; idx < nbRecvClient; ++idx)
265  {
266    if (0 != sendNbIndexOnReturn[idx])
267    {
268      requestOnReturn_size += 2;
269    }
270  }
271
272  int requestOnReturn_position=0;
273
274  std::vector<ep_lib::MPI_Request> requestOnReturn(requestOnReturn_size);
275  currentIndex = 0;
276  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
277  {
278    if (0 != recvNbIndexOnReturn[idx])
279    {
280      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
281      recvInfoFromClients(recvRankOnReturn[idx],
282                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
283                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
284                          commLevel, &requestOnReturn[requestOnReturn_position++]);
285    }
286    currentIndex += recvNbIndexOnReturn[idx];
287  }
288
289  std::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
290  std::unordered_map<int,size_t*> client2ClientIndexOnReturn;
291  currentIndex = 0;
292  for (int idx = 0; idx < nbRecvClient; ++idx)
293  {
294    if (0 != sendNbIndexOnReturn[idx])
295    {
296      int rank = recvRankClient[idx];
297      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
298      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
299      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
300      int infoIndex = 0;
301      int nb = 0;
302      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
303      {
304        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
305        if (iteIndexToInfoMap != itIndexToInfoMap)
306        {
307          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
308          for (int k = 0; k < infoTmp.size(); ++k)
309          {
310            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
311            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
312            ++nb;
313          }
314        }
315      }
316
317      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
318                         sendNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
319      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
320                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, &requestOnReturn[requestOnReturn_position++]);
321    }
322    currentIndex += recvNbIndexClientCount[idx];
323  }
324
325  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
326  ep_lib::MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
327
328  Index2VectorInfoTypeMap indexToInfoMapping;
329  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
330  int infoIndex = 0;
331  InfoType unpackedInfo;
332  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
333  {
334    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
335    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
336  }
337
338  indexToInfoMappingLevel_.swap(indexToInfoMapping);
339  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
340  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
341                                                        it != client2ClientIndex.end(); ++it)
342      delete [] it->second;
343  delete tmpGlobalIndex;
344
345  if (0 != recvNbIndexCountOnReturn)
346  {
347    delete [] recvIndexBuffOnReturn;
348    delete [] recvInfoBuffOnReturn;
349  }
350
351  for (std::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
352                                                               it != client2ClientInfoOnReturn.end(); ++it)
353      delete [] it->second;
354
355  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
356                                            it != client2ClientIndexOnReturn.end(); ++it)
357      delete [] it->second;
358}
359
360/*!
361  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
362*/
363template<typename T, typename H>
364void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
365{
366  // Compute range of hash index for each client
367  hashedIndex.resize(nbClient+1);
368  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
369  size_t nbHashIndex;
370  hashedIndex[0] = 0;
371  for (int i = 1; i < nbClient; ++i)
372  {
373    nbHashIndex = nbHashIndexMax / nbClient;
374    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
375    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
376  }
377  hashedIndex[nbClient] = nbHashIndexMax;
378}
379
380/*!
381  Compute distribution of global index for servers
382  Each client already holds a piece of information and its associated index.
383This information will be redistributed among processes by projecting indices into size_t space,
384the corresponding information will be also distributed on size_t space.
385After the redistribution, each client holds rearranged index and its corresponding information.
386  \param [in] indexInfoMap index and its corresponding info (usually server index)
387  \param [in] commLevel communicator of current level
388  \param [in] level current level
389*/
390template<typename T, typename H>
391void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
392                                                            const ep_lib::MPI_Comm& commLevel,
393                                                            int level)
394{
395  int clientRank;
396  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
397  computeSendRecvRank(level, clientRank);
398
399  int groupRankBegin = this->getGroupBegin()[level];
400  int nbClient = this->getNbInGroup()[level];
401  std::vector<size_t> hashedIndex;
402  computeHashIndex(hashedIndex, nbClient);
403
404  std::vector<int> sendBuff(nbClient,0);
405  std::vector<int> sendNbIndexBuff(nbClient,0);
406  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
407                                      iteClientHash = hashedIndex.end();
408  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
409                                                   ite = indexInfoMap.end();
410  HashXIOS<size_t> hashGlobalIndex;
411
412  // Compute size of sending and receving buffer
413  for (it = itb; it != ite; ++it)
414  {
415    size_t hashIndex = hashGlobalIndex(it->first);
416    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
417    int indexClient = std::distance(itbClientHash, itClientHash)-1;
418    sendNbIndexBuff[indexClient] += it->second.size();
419  }
420
421  std::unordered_map<int, size_t*> client2ClientIndex;
422  std::unordered_map<int, unsigned char*> client2ClientInfo;
423  for (int idx = 0; idx < nbClient; ++idx)
424  {
425    if (0 != sendNbIndexBuff[idx])
426    {
427      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
428      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
429      sendNbIndexBuff[idx] = 0;
430      sendBuff[idx] = 1;
431    }
432  }
433
434  std::vector<int> sendNbInfo(nbClient,0);
435  for (it = itb; it != ite; ++it)
436  {
437    const std::vector<InfoType>& infoTmp = it->second;
438    size_t hashIndex = hashGlobalIndex(it->first);
439    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
440    int indexClient = std::distance(itbClientHash, itClientHash)-1;
441    for (int idx = 0; idx < infoTmp.size(); ++idx)
442    {
443      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
444  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
445      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
446      ++sendNbIndexBuff[indexClient];
447    }
448  }
449
450  // Calculate from how many clients each client receive message.
451  // Calculate size of buffer for receiving message
452  std::vector<int> recvRankClient, recvNbIndexClientCount;
453  sendRecvRank(level, sendBuff, sendNbIndexBuff,
454               recvRankClient, recvNbIndexClientCount);
455
456  int recvNbIndexCount = 0;
457  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
458    recvNbIndexCount += recvNbIndexClientCount[idx];
459
460  unsigned long* recvIndexBuff;
461  unsigned char* recvInfoBuff;
462  if (0 != recvNbIndexCount)
463  {
464    recvIndexBuff = new unsigned long[recvNbIndexCount];
465    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
466  }
467
468  // If a client holds information about index and the corresponding which don't belong to it,
469  // it will send a message to the correct clients.
470  // Contents of the message are index and its corresponding informatioin
471  int request_size = 0;
472   for (int idx = 0; idx < recvRankClient.size(); ++idx)
473   {
474     if (0 != recvNbIndexClientCount[idx])
475     {
476       request_size += 2;
477     }
478   }
479 
480   request_size += client2ClientIndex.size();
481   request_size += client2ClientInfo.size();
482 
483   std::vector<ep_lib::MPI_Request> request(request_size);
484  int currentIndex = 0;
485  int nbRecvClient = recvRankClient.size();
486  int request_position=0;
487  for (int idx = 0; idx < nbRecvClient; ++idx)
488  {
489    if (0 != recvNbIndexClientCount[idx])
490    {
491        recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
492        recvInfoFromClients(recvRankClient[idx],
493                            recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
494                            recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
495                            commLevel, &request[request_position++]);
496    }
497    currentIndex += recvNbIndexClientCount[idx];
498  }
499
500  std::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
501                                                iteIndex = client2ClientIndex.end();
502  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
503    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
504  std::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
505                                                      iteInfo = client2ClientInfo.end();
506  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
507    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, &request[request_position++]);
508
509  std::vector<ep_lib::MPI_Status> status(request.size());
510  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
511
512  Index2VectorInfoTypeMap indexToInfoMapping;
513  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
514  currentIndex = 0;
515  InfoType infoValue;
516  int infoIndex = 0;
517  unsigned char* infoBuff = recvInfoBuff;
518  for (int idx = 0; idx < nbRecvClient; ++idx)
519  {
520    size_t index;
521    int count = recvNbIndexClientCount[idx];
522    for (int i = 0; i < count; ++i)
523    {
524      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
525      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
526    }
527    currentIndex += count;
528  }
529
530  if (0 != recvNbIndexCount)
531  {
532    delete [] recvIndexBuff;
533    delete [] recvInfoBuff;
534  }
535  for (std::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
536                                                               it != client2ClientInfo.end(); ++it)
537      delete [] it->second;
538
539  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
540                                                        it != client2ClientIndex.end(); ++it)
541      delete [] it->second;
542
543  // Ok, now do something recursive
544  if (0 < level)
545  {
546    --level;
547    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
548  }
549  else
550    index2InfoMapping_.swap(indexToInfoMapping);
551}
552
553/*!
554  Send message containing index to clients
555  \param [in] clientDestRank rank of destination client
556  \param [in] indices index to send
557  \param [in] indiceSize size of index array to send
558  \param [in] clientIntraComm communication group of client
559  \param [in] requestSendIndex list of sending request
560*/
561template<typename T, typename H>
562void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
563                                                       const ep_lib::MPI_Comm& clientIntraComm,
564                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
565{
566  ep_lib::MPI_Request request;
567  requestSendIndex.push_back(request);
568  ep_lib::MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
569            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
570}
571
572/*!
573  Send message containing index to clients
574  \param [in] clientDestRank rank of destination client
575  \param [in] indices index to send
576  \param [in] indiceSize size of index array to send
577  \param [in] clientIntraComm communication group of client
578  \param [in] requestSendIndex sending request
579*/
580template<typename T, typename H>
581void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
582                                                       const ep_lib::MPI_Comm& clientIntraComm,
583                                                       ep_lib::MPI_Request* requestSendIndex)
584{
585  ep_lib::MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
586            clientDestRank, MPI_DHT_INDEX, clientIntraComm, requestSendIndex);
587}
588
589/*!
590  Receive message containing index to clients
591  \param [in] clientDestRank rank of destination client
592  \param [in] indices index to send
593  \param [in] clientIntraComm communication group of client
594  \param [in] requestRecvIndex list of receiving request
595*/
596template<typename T, typename H>
597void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
598                                                         const ep_lib::MPI_Comm& clientIntraComm,
599                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
600{
601  ep_lib::MPI_Request request;
602  requestRecvIndex.push_back(request);
603  ep_lib::MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
604            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
605}
606
607/*!
608  Receive message containing index to clients
609  \param [in] clientDestRank rank of destination client
610  \param [in] indices index to send
611  \param [in] clientIntraComm communication group of client
612  \param [in] requestRecvIndex receiving request
613*/
614template<typename T, typename H>
615void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
616                                                         const ep_lib::MPI_Comm& clientIntraComm,
617                                                         ep_lib::MPI_Request *requestRecvIndex)
618{
619  ep_lib::MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
620            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, requestRecvIndex);
621}
622
623/*!
624  Send message containing information to clients
625  \param [in] clientDestRank rank of destination client
626  \param [in] info info array to send
627  \param [in] infoSize info array size to send
628  \param [in] clientIntraComm communication group of client
629  \param [in] requestSendInfo list of sending request
630*/
631template<typename T, typename H>
632void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
633                                                      const ep_lib::MPI_Comm& clientIntraComm,
634                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
635{
636  ep_lib::MPI_Request request;
637  requestSendInfo.push_back(request);
638
639  ep_lib::MPI_Isend(info, infoSize, MPI_CHAR,
640            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
641}
642
643/*!
644  Send message containing information to clients
645  \param [in] clientDestRank rank of destination client
646  \param [in] info info array to send
647  \param [in] infoSize info array size to send
648  \param [in] clientIntraComm communication group of client
649  \param [in] requestSendInfo sending request
650*/
651template<typename T, typename H>
652void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
653                                                      const ep_lib::MPI_Comm& clientIntraComm,
654                                                      ep_lib::MPI_Request *requestSendInfo)
655{
656  ep_lib::MPI_Isend(info, infoSize, MPI_CHAR,
657            clientDestRank, MPI_DHT_INFO, clientIntraComm, requestSendInfo);
658}
659
660/*!
661  Receive message containing information from other clients
662  \param [in] clientDestRank rank of destination client
663  \param [in] info info array to receive
664  \param [in] infoSize info array size to receive
665  \param [in] clientIntraComm communication group of client
666  \param [in] requestRecvInfo list of sending request
667*/
668template<typename T, typename H>
669void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
670                                                        const ep_lib::MPI_Comm& clientIntraComm,
671                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
672{
673  ep_lib::MPI_Request request;
674  requestRecvInfo.push_back(request);
675
676  ep_lib::MPI_Irecv(info, infoSize, MPI_CHAR,
677            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
678}
679
680/*!
681  Receive message containing information from other clients
682  \param [in] clientDestRank rank of destination client
683  \param [in] info info array to receive
684  \param [in] infoSize info array size to receive
685  \param [in] clientIntraComm communication group of client
686  \param [in] requestRecvInfo list of receiving request
687*/
688template<typename T, typename H>
689void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
690                                                        const ep_lib::MPI_Comm& clientIntraComm,
691                                                        ep_lib::MPI_Request* requestRecvInfo)
692{
693  ep_lib::MPI_Irecv(info, infoSize, MPI_CHAR,
694            clientSrcRank, MPI_DHT_INFO, clientIntraComm, requestRecvInfo);
695}
696
697/*!
698  Compute how many processes one process needs to send to and from how many processes it will receive.
699  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
700*/
701template<typename T, typename H>
702void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
703{
704  int groupBegin = this->getGroupBegin()[level];
705  int nbInGroup  = this->getNbInGroup()[level];
706  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
707  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
708
709  std::vector<size_t> hashedIndexGroup;
710  computeHashIndex(hashedIndexGroup, nbInGroup);
711  size_t a = hashedIndexGroup[rank-groupBegin];
712  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
713
714  int currentGroup, offset;
715  size_t e,f;
716
717  // Do a simple math [a,b) intersect [c,d)
718  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
719  {
720    std::vector<size_t> hashedIndexGroupParent;
721    int nbInGroupParent = nbInGroupParents[idx];
722    if (0 != nbInGroupParent)
723      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
724    for (int i = 0; i < nbInGroupParent; ++i)
725    {
726      size_t c = hashedIndexGroupParent[i];
727      size_t d = hashedIndexGroupParent[i+1]-1;
728
729    if (!((d < a) || (b <c)))
730        recvRank_[level].push_back(groupParentBegin[idx]+i);
731    }
732
733    offset = rank - groupParentBegin[idx];
734    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
735    {
736      e = hashedIndexGroupParent[offset];
737      f = hashedIndexGroupParent[offset+1]-1;
738    }
739  }
740
741  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
742                                      iteHashGroup = hashedIndexGroup.end();
743  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
744  int begin = std::distance(itbHashGroup, itHashGroup)-1;
745  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
746  int end = std::distance(itbHashGroup, itHashGroup) -1;
747  sendRank_[level].resize(end-begin+1);
748  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
749}
750
751/*!
752  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
753  \param [in] sendNbRank Rank of clients to send to
754  \param [in] sendNbElements Number of elements each client to send to
755  \param [in] receiveNbRank Rank of clients to receive from
756  \param [out] recvNbElements Number of elements each client to send to
757*/
758template<typename T, typename H>
759void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
760                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
761{
762  recvNbElements.resize(recvNbRank.size());
763  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
764  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
765
766  int nRequest = 0;
767  for (int idx = 0; idx < recvNbRank.size(); ++idx)
768  {
769    ep_lib::MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
770              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
771    ++nRequest;
772  }
773
774  for (int idx = 0; idx < sendNbRank.size(); ++idx)
775  {
776    ep_lib::MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
777              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
778    ++nRequest;
779  }
780
781  ep_lib::MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
782}
783
784/*!
785  Send and receive number of process each process need to listen to as well as number
786  of index it will receive during the initalization phase
787  \param [in] level current level
788  \param [in] sendNbRank Rank of clients to send to
789  \param [in] sendNbElements Number of elements each client to send to
790  \param [out] receiveNbRank Rank of clients to receive from
791  \param [out] recvNbElements Number of elements each client to send to
792*/
793template<typename T, typename H>
794void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
795                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
796                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
797{
798  int groupBegin = this->getGroupBegin()[level];
799
800  int offSet = 0;
801  std::vector<int>& sendRank = sendRank_[level];
802  std::vector<int>& recvRank = recvRank_[level];
803  int sendBuffSize = sendRank.size();
804  std::vector<int> sendBuff(sendBuffSize*2);
805  int recvBuffSize = recvRank.size();
806  std::vector<int> recvBuff(recvBuffSize*2,0);
807
808  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
809  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
810  int nRequest = 0;
811  for (int idx = 0; idx < recvBuffSize; ++idx)
812  {
813    ep_lib::MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
814              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest++]);
815  }
816
817  for (int idx = 0; idx < sendBuffSize; ++idx)
818  {
819    offSet = sendRank[idx]-groupBegin;
820    sendBuff[idx*2] = sendNbRank[offSet];
821    sendBuff[idx*2+1] = sendNbElements[offSet];
822  }
823
824  for (int idx = 0; idx < sendBuffSize; ++idx)
825  {
826    ep_lib::MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
827              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest++]);
828  }
829
830  ep_lib::MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
831  int nbRecvRank = 0, nbRecvElements = 0;
832  recvNbRank.clear();
833  recvNbElements.clear();
834  for (int idx = 0; idx < recvBuffSize; ++idx)
835  {
836    if (0 != recvBuff[2*idx])
837    {
838      recvNbRank.push_back(recvRank[idx]);
839      recvNbElements.push_back(recvBuff[2*idx+1]);
840    }
841  }
842}
843
844}
Note: See TracBrowser for help on using the repository browser.