source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 863

Last change on this file since 863 was 863, checked in by mhnguyen, 5 years ago

Some minor improvements for DHT

+) Use to minimize the redundant loops

Test
+) ON Curie
+) It works well.

File size: 46.5 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm)
26  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
27{
28  MPI_Comm_size(clientIntraComm, &nbClient_);
29  this->computeMPICommLevel();
30  int nbLvl = this->getNbLevel();
31  sendRank_.resize(nbLvl);
32  recvRank_.resize(nbLvl);
33  Index2VectorInfoTypeMap indexToVecInfoMap;
34  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
35  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
36  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
37  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
38}
39
40/*!
41  Constructor with initial distribution information and the corresponding index
42  Each client (process) holds a piece of information as well as the attached index, the index
43will be redistributed (projected) into size_t space as long as the associated information.
44  \param [in] indexInfoMap initial index and information mapping
45  \param [in] clientIntraComm communicator of clients
46  \param [in] hierarLvl level of hierarchy
47*/
48template<typename T, typename H>
49CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
50                                                        const MPI_Comm& clientIntraComm)
51  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
52{
53  MPI_Comm_size(clientIntraComm, &nbClient_);
54  this->computeMPICommLevel();
55  int nbLvl = this->getNbLevel();
56  sendRank_.resize(nbLvl);
57  recvRank_.resize(nbLvl);
58  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
59}
60
61template<typename T, typename H>
62CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
63{
64}
65
66/*!
67  Compute mapping between indices and information corresponding to these indices
68  \param [in] indices indices a proc has
69*/
70template<typename T, typename H>
71void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
72{
73  int nbLvl = this->getNbLevel();
74  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
75}
76
77/*!
78    Compute mapping between indices and information corresponding to these indices
79for each level of hierarchical DHT. Recursive function
80   \param [in] indices indices a proc has
81   \param [in] commLevel communicator of current level
82   \param [in] level current level
83*/
84template<typename T, typename H>
85void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
86                                                                 const MPI_Comm& commLevel,
87                                                                 int level)
88{
89  int clientRank;
90  MPI_Comm_rank(commLevel,&clientRank);
91  int groupRankBegin = this->getGroupBegin()[level];
92  int nbClient = this->getNbInGroup()[level];
93  std::vector<size_t> hashedIndex;
94  computeHashIndex(hashedIndex, nbClient);
95
96  size_t ssize = indices.numElements(), hashedVal;
97
98  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
99                                      iteClientHash = hashedIndex.end();
100  std::vector<int> sendBuff(nbClient,0);
101  std::vector<int> sendNbIndexBuff(nbClient,0);
102
103  // Number of global index whose mapping server are on other clients
104  int nbIndexToSend = 0;
105  size_t index;
106  HashXIOS<size_t> hashGlobalIndex;
107  boost::unordered_map<size_t,int> nbIndices;
108  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
109  for (int i = 0; i < ssize; ++i)
110  {
111    index = indices(i);
112    if (0 == nbIndices.count(index))
113    {
114      hashedVal  = hashGlobalIndex(index);
115      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
116      int indexClient = std::distance(itbClientHash, itClientHash)-1;
117      ++sendNbIndexBuff[indexClient];
118      nbIndices[index] = 1;
119    }
120  }
121
122  boost::unordered_map<int, size_t* > client2ClientIndex;
123  for (int idx = 0; idx < nbClient; ++idx)
124  {
125    if (0 != sendNbIndexBuff[idx])
126    {
127      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
128      nbIndexToSend += sendNbIndexBuff[idx];
129      sendBuff[idx] = 1;
130      sendNbIndexBuff[idx] = 0;
131    }
132  }
133
134  for (int i = 0; i < ssize; ++i)
135  {
136    index = indices(i);
137    if (1 == nbIndices[index])
138    {
139      hashedVal  = hashGlobalIndex(index);
140      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
141      int indexClient = std::distance(itbClientHash, itClientHash)-1;
142      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
143      ++sendNbIndexBuff[indexClient];
144      ++nbIndices[index];
145    }
146  }
147
148  std::vector<int> recvRankClient, recvNbIndexClientCount;
149  sendRecvRank(level, sendBuff, sendNbIndexBuff,
150               recvRankClient, recvNbIndexClientCount);
151
152  int recvNbIndexCount = 0;
153  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
154    recvNbIndexCount += recvNbIndexClientCount[idx];
155
156  unsigned long* recvIndexBuff;
157  if (0 != recvNbIndexCount)
158    recvIndexBuff = new unsigned long[recvNbIndexCount];
159
160  std::vector<MPI_Request> request;
161  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
162                             iteRecvIndex = recvRankClient.end(),
163                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
164                           itRecvNbIndex;
165  int currentIndex = 0;
166  int nbRecvClient = recvRankClient.size();
167  for (int idx = 0; idx < nbRecvClient; ++idx)
168  {
169    if (0 != recvNbIndexClientCount[idx])
170      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
171    currentIndex += recvNbIndexClientCount[idx];
172  }
173
174  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
175                                                iteIndex = client2ClientIndex.end();
176  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
177    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
178
179  std::vector<MPI_Status> status(request.size());
180  MPI_Waitall(request.size(), &request[0], &status[0]);
181
182  CArray<size_t,1>* tmpGlobalIndex;
183  if (0 != recvNbIndexCount)
184    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
185  else
186    tmpGlobalIndex = new CArray<size_t,1>();
187
188  // OK, we go to the next level and do something recursive
189  if (0 < level)
190  {
191    --level;
192    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
193  }
194  else // Now, we are in the last level where necessary mappings are.
195    indexToInfoMappingLevel_= (index2InfoMapping_);
196
197  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
198  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
199  currentIndex = 0;
200  for (int idx = 0; idx < nbRecvClient; ++idx)
201  {
202    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
203    {
204      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
205      if (iteIndexToInfoMap != itIndexToInfoMap)
206        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
207//        ++sendNbIndexOnReturn[idx];
208    }
209    currentIndex += recvNbIndexClientCount[idx];
210  }
211
212  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
213  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
214  int indexIndex = 0;
215  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
216  {
217    recvRankOnReturn[indexIndex] = itIndex->first;
218  }
219  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
220                   recvRankOnReturn, recvNbIndexOnReturn);
221
222  int recvNbIndexCountOnReturn = 0;
223  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
224    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
225
226  unsigned long* recvIndexBuffOnReturn;
227  unsigned char* recvInfoBuffOnReturn;
228  if (0 != recvNbIndexCountOnReturn)
229  {
230    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
231    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
232  }
233
234  std::vector<MPI_Request> requestOnReturn;
235  currentIndex = 0;
236  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
237  {
238    if (0 != recvNbIndexOnReturn[idx])
239    {
240      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
241      recvInfoFromClients(recvRankOnReturn[idx],
242                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
243                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
244                          commLevel, requestOnReturn);
245    }
246    currentIndex += recvNbIndexOnReturn[idx];
247  }
248
249  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
250  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
251  currentIndex = 0;
252  for (int idx = 0; idx < nbRecvClient; ++idx)
253  {
254    if (0 != sendNbIndexOnReturn[idx])
255    {
256      int rank = recvRankClient[idx];
257      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
258      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
259      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
260      int infoIndex = 0;
261      int nb = 0;
262      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
263      {
264        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
265        if (iteIndexToInfoMap != itIndexToInfoMap)
266        {
267          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
268          for (int k = 0; k < infoTmp.size(); ++k)
269          {
270            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
271            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
272            ++nb;
273          }
274        }
275      }
276
277      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
278                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
279      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
280                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
281    }
282    currentIndex += recvNbIndexClientCount[idx];
283  }
284
285  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
286  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
287
288  Index2VectorInfoTypeMap indexToInfoMapping;
289  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
290  int infoIndex = 0;
291  InfoType unpackedInfo;
292  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
293  {
294    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
295    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
296//    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex);
297  }
298
299  indexToInfoMappingLevel_.swap(indexToInfoMapping); //indexToInfoMappingLevel_ = (indexToInfoMapping);
300  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
301  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
302                                                        it != client2ClientIndex.end(); ++it)
303      delete [] it->second;
304  delete tmpGlobalIndex;
305
306  if (0 != recvNbIndexCountOnReturn)
307  {
308    delete [] recvIndexBuffOnReturn;
309    delete [] recvInfoBuffOnReturn;
310  }
311
312  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
313                                                               it != client2ClientInfoOnReturn.end(); ++it)
314      delete [] it->second;
315
316  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
317                                            it != client2ClientIndexOnReturn.end(); ++it)
318      delete [] it->second;
319}
320
321///*!
322//    Compute mapping between indices and information corresponding to these indices
323//for each level of hierarchical DHT. Recursive function
324//   \param [in] indices indices a proc has
325//   \param [in] commLevel communicator of current level
326//   \param [in] level current level
327//*/
328//template<typename T, typename H>
329//void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
330//                                                                 const MPI_Comm& commLevel,
331//                                                                 int level)
332//{
333//  int clientRank;
334//  MPI_Comm_rank(commLevel,&clientRank);
335//  int groupRankBegin = this->getGroupBegin()[level];
336//  int nbClient = this->getNbInGroup()[level];
337//  std::vector<size_t> hashedIndex;
338//  computeHashIndex(hashedIndex, nbClient);
339//
340//  size_t ssize = indices.numElements(), hashedVal;
341//
342//  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
343//                                      iteClientHash = hashedIndex.end();
344//  std::vector<int> sendBuff(nbClient,0);
345//  std::vector<int> sendNbIndexBuff(nbClient,0);
346//
347//  // Number of global index whose mapping server are on other clients
348//  int nbIndexToSend = 0;
349//  size_t index;
350//  HashXIOS<size_t> hashGlobalIndex;
351//  for (int i = 0; i < ssize; ++i)
352//  {
353//    index = indices(i);
354//    hashedVal  = hashGlobalIndex(index);
355//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
356//    int indexClient = std::distance(itbClientHash, itClientHash)-1;
357//    ++sendNbIndexBuff[indexClient];
358//  }
359//
360//  boost::unordered_map<int, size_t* > client2ClientIndex;
361//  for (int idx = 0; idx < nbClient; ++idx)
362//  {
363//    if (0 != sendNbIndexBuff[idx])
364//    {
365//      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
366//      nbIndexToSend += sendNbIndexBuff[idx];
367//      sendBuff[idx] = 1;
368//      sendNbIndexBuff[idx] = 0;
369//    }
370//  }
371//
372//  for (int i = 0; i < ssize; ++i)
373//  {
374//    index = indices(i);
375//    hashedVal  = hashGlobalIndex(index);
376//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
377//    {
378//      int indexClient = std::distance(itbClientHash, itClientHash)-1;
379//      {
380//        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
381//        ++sendNbIndexBuff[indexClient];
382//      }
383//    }
384//  }
385//
386//  std::vector<int> recvRankClient, recvNbIndexClientCount;
387//  sendRecvRank(level, sendBuff, sendNbIndexBuff,
388//               recvRankClient, recvNbIndexClientCount);
389//
390//  int recvNbIndexCount = 0;
391//  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
392//    recvNbIndexCount += recvNbIndexClientCount[idx];
393//
394//  unsigned long* recvIndexBuff;
395//  if (0 != recvNbIndexCount)
396//    recvIndexBuff = new unsigned long[recvNbIndexCount];
397//
398//  std::vector<MPI_Request> request;
399//  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
400//                             iteRecvIndex = recvRankClient.end(),
401//                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
402//                           itRecvNbIndex;
403//  int currentIndex = 0;
404//  int nbRecvClient = recvRankClient.size();
405//  for (int idx = 0; idx < nbRecvClient; ++idx)
406//  {
407//    if (0 != recvNbIndexClientCount[idx])
408//      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
409//    currentIndex += recvNbIndexClientCount[idx];
410//  }
411//
412//  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
413//                                                iteIndex = client2ClientIndex.end();
414//  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
415//    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
416//
417//  std::vector<MPI_Status> status(request.size());
418//  MPI_Waitall(request.size(), &request[0], &status[0]);
419//
420//  CArray<size_t,1>* tmpGlobalIndex;
421//  if (0 != recvNbIndexCount)
422//    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
423//  else
424//    tmpGlobalIndex = new CArray<size_t,1>();
425//
426//  // OK, we go to the next level and do something recursive
427//  if (0 < level)
428//  {
429//    --level;
430//    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
431//  }
432//  else // Now, we are in the last level where necessary mappings are.
433//    indexToInfoMappingLevel_= (index2InfoMapping_);
434//
435//  typename Index2InfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
436//  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
437//  currentIndex = 0;
438//  for (int idx = 0; idx < nbRecvClient; ++idx)
439//  {
440//    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
441//    {
442//      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
443//      if (iteIndexToInfoMap != itIndexToInfoMap) ++sendNbIndexOnReturn[idx];
444//    }
445//    currentIndex += recvNbIndexClientCount[idx];
446//  }
447//
448//  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
449//  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
450//  int indexIndex = 0;
451//  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
452//  {
453//    recvRankOnReturn[indexIndex] = itIndex->first;
454//  }
455//  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
456//                   recvRankOnReturn, recvNbIndexOnReturn);
457//
458//  int recvNbIndexCountOnReturn = 0;
459//  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
460//    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
461//
462//  unsigned long* recvIndexBuffOnReturn;
463//  unsigned char* recvInfoBuffOnReturn;
464//  if (0 != recvNbIndexCountOnReturn)
465//  {
466//    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
467//    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
468//  }
469//
470//  std::vector<MPI_Request> requestOnReturn;
471//  currentIndex = 0;
472//  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
473//  {
474//    if (0 != recvNbIndexOnReturn[idx])
475//    {
476//      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
477//      recvInfoFromClients(recvRankOnReturn[idx],
478//                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
479//                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
480//                          commLevel, requestOnReturn);
481//    }
482//    currentIndex += recvNbIndexOnReturn[idx];
483//  }
484//
485//  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
486//  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
487//  currentIndex = 0;
488//  for (int idx = 0; idx < nbRecvClient; ++idx)
489//  {
490//    if (0 != sendNbIndexOnReturn[idx])
491//    {
492//      int rank = recvRankClient[idx];
493//      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
494//      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
495//      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
496//      int infoIndex = 0;
497//      int nb = 0;
498//      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
499//      {
500//        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
501//        if (iteIndexToInfoMap != itIndexToInfoMap)
502//        {
503//          client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
504//          ProcessDHTElement<InfoType>::packElement(itIndexToInfoMap->second, tmpInfoPtr, infoIndex);
505//          ++nb;
506//        }
507//      }
508//
509//      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
510//                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
511//      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
512//                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
513//    }
514//    currentIndex += recvNbIndexClientCount[idx];
515//  }
516//
517//  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
518//  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
519//
520//  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
521//  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
522//  int infoIndex = 0;
523//  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
524//  {
525//    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex);
526//  }
527//
528//  indexToInfoMappingLevel_.swap(indexToInfoMapping); //indexToInfoMappingLevel_ = (indexToInfoMapping);
529//  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
530//  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
531//                                                        it != client2ClientIndex.end(); ++it)
532//      delete [] it->second;
533//  delete tmpGlobalIndex;
534//
535//  if (0 != recvNbIndexCountOnReturn)
536//  {
537//    delete [] recvIndexBuffOnReturn;
538//    delete [] recvInfoBuffOnReturn;
539//  }
540//
541//  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
542//                                                               it != client2ClientInfoOnReturn.end(); ++it)
543//      delete [] it->second;
544//
545//  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
546//                                            it != client2ClientIndexOnReturn.end(); ++it)
547//      delete [] it->second;
548//}
549
550/*!
551  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
552*/
553template<typename T, typename H>
554void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
555{
556  // Compute range of hash index for each client
557  hashedIndex.resize(nbClient+1);
558  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
559  size_t nbHashIndex;
560  hashedIndex[0] = 0;
561  for (int i = 1; i < nbClient; ++i)
562  {
563    nbHashIndex = nbHashIndexMax / nbClient;
564    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
565    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
566  }
567  hashedIndex[nbClient] = nbHashIndexMax;
568}
569
570/*!
571  Compute distribution of global index for servers
572  Each client already holds a piece of information and its associated index.
573This information will be redistributed among processes by projecting indices into size_t space,
574the corresponding information will be also distributed on size_t space.
575After the redistribution, each client holds rearranged index and its corresponding information.
576  \param [in] indexInfoMap index and its corresponding info (usually server index)
577  \param [in] commLevel communicator of current level
578  \param [in] level current level
579*/
580template<typename T, typename H>
581void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
582                                                            const MPI_Comm& commLevel,
583                                                            int level)
584{
585  int clientRank;
586  MPI_Comm_rank(commLevel,&clientRank);
587  computeSendRecvRank(level, clientRank);
588
589  int groupRankBegin = this->getGroupBegin()[level];
590  int nbClient = this->getNbInGroup()[level];
591  std::vector<size_t> hashedIndex;
592  computeHashIndex(hashedIndex, nbClient);
593
594  std::vector<int> sendBuff(nbClient,0);
595  std::vector<int> sendNbIndexBuff(nbClient,0);
596  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
597                                      iteClientHash = hashedIndex.end();
598  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
599                                                   ite = indexInfoMap.end();
600  HashXIOS<size_t> hashGlobalIndex;
601
602  // Compute size of sending and receving buffer
603  for (it = itb; it != ite; ++it)
604  {
605    size_t hashIndex = hashGlobalIndex(it->first);
606    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
607    int indexClient = std::distance(itbClientHash, itClientHash)-1;
608    sendNbIndexBuff[indexClient] += it->second.size();
609  }
610
611  boost::unordered_map<int, size_t*> client2ClientIndex;
612  boost::unordered_map<int, unsigned char*> client2ClientInfo;
613  for (int idx = 0; idx < nbClient; ++idx)
614  {
615    if (0 != sendNbIndexBuff[idx])
616    {
617      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
618      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
619      sendNbIndexBuff[idx] = 0;
620      sendBuff[idx] = 1;
621    }
622  }
623
624  std::vector<int> sendNbInfo(nbClient,0);
625  for (it = itb; it != ite; ++it)
626  {
627    const std::vector<InfoType>& infoTmp = it->second;
628    size_t hashIndex = hashGlobalIndex(it->first);
629    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
630    int indexClient = std::distance(itbClientHash, itClientHash)-1;
631    for (int idx = 0; idx < infoTmp.size(); ++idx)
632    {
633      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
634  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
635      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
636      ++sendNbIndexBuff[indexClient];
637    }
638  }
639
640  // Calculate from how many clients each client receive message.
641  // Calculate size of buffer for receiving message
642  std::vector<int> recvRankClient, recvNbIndexClientCount;
643  sendRecvRank(level, sendBuff, sendNbIndexBuff,
644               recvRankClient, recvNbIndexClientCount);
645
646  int recvNbIndexCount = 0;
647  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
648    recvNbIndexCount += recvNbIndexClientCount[idx];
649
650  unsigned long* recvIndexBuff;
651  unsigned char* recvInfoBuff;
652  if (0 != recvNbIndexCount)
653  {
654    recvIndexBuff = new unsigned long[recvNbIndexCount];
655    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
656  }
657
658  // If a client holds information about index and the corresponding which don't belong to it,
659  // it will send a message to the correct clients.
660  // Contents of the message are index and its corresponding informatioin
661  std::vector<MPI_Request> request;
662  int currentIndex = 0;
663  int nbRecvClient = recvRankClient.size();
664  for (int idx = 0; idx < nbRecvClient; ++idx)
665  {
666    if (0 != recvNbIndexClientCount[idx])
667    {
668      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
669      recvInfoFromClients(recvRankClient[idx],
670                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
671                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
672                          commLevel, request);
673    }
674    currentIndex += recvNbIndexClientCount[idx];
675  }
676
677  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
678                                                iteIndex = client2ClientIndex.end();
679  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
680    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
681  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
682                                                      iteInfo = client2ClientInfo.end();
683  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
684    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
685
686  std::vector<MPI_Status> status(request.size());
687  MPI_Waitall(request.size(), &request[0], &status[0]);
688
689  Index2VectorInfoTypeMap indexToInfoMapping;
690  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
691//  boost::unordered_map<size_t,int> tmpInfoSize;
692//  tmpInfoSize.rehash(std::ceil(currentIndex/tmpInfoSize.max_load_factor()));
693//  currentIndex = 0;
694//  for (int idx = 0; idx < nbRecvClient; ++idx)
695//  {
696//    int count = recvNbIndexClientCount[idx];
697//    for (int i = 0; i < count; ++i)
698//    {
699//      ++tmpInfoSize[*(recvIndexBuff+currentIndex+i)];
700//    }
701//    currentIndex += count;
702//  }
703//
704//  for (boost::unordered_map<size_t,int>::iterator it=tmpInfoSize.begin(); it != tmpInfoSize.end(); ++it)
705//  {
706//    indexToInfoMapping[it->first].resize(it->second);
707//    it->second = 0;
708//  }
709
710  currentIndex = 0;
711  InfoType infoValue;
712  int infoIndex = 0;
713  unsigned char* infoBuff = recvInfoBuff;
714  for (int idx = 0; idx < nbRecvClient; ++idx)
715  {
716//    size_t index;
717    int count = recvNbIndexClientCount[idx];
718    for (int i = 0; i < count; ++i)
719    {
720//      index = *(recvIndexBuff+currentIndex+i);
721      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
722//      ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[index][tmpInfoSize[index]], infoBuff, infoIndex);
723//      ++tmpInfoSize[index];
724      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
725    }
726    currentIndex += count;
727  }
728
729  if (0 != recvNbIndexCount)
730  {
731    delete [] recvIndexBuff;
732    delete [] recvInfoBuff;
733  }
734  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
735                                                               it != client2ClientInfo.end(); ++it)
736      delete [] it->second;
737
738  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
739                                                        it != client2ClientIndex.end(); ++it)
740      delete [] it->second;
741
742  // Ok, now do something recursive
743  if (0 < level)
744  {
745    --level;
746    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
747  }
748  else
749    index2InfoMapping_.swap(indexToInfoMapping);
750}
751
752///*!
753//  Compute distribution of global index for servers
754//  Each client already holds a piece of information and its associated index.
755//This information will be redistributed among processes by projecting indices into size_t space,
756//the corresponding information will be also distributed on size_t space.
757//After the redistribution, each client holds rearranged index and its corresponding information.
758//  \param [in] indexInfoMap index and its corresponding info (usually server index)
759//  \param [in] commLevel communicator of current level
760//  \param [in] level current level
761//*/
762//template<typename T, typename H>
763//void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
764//                                                            const MPI_Comm& commLevel,
765//                                                            int level)
766//{
767//  int clientRank;
768//  MPI_Comm_rank(commLevel,&clientRank);
769//  computeSendRecvRank(level, clientRank);
770//
771//  int groupRankBegin = this->getGroupBegin()[level];
772//  int nbClient = this->getNbInGroup()[level];
773//  std::vector<size_t> hashedIndex;
774//  computeHashIndex(hashedIndex, nbClient);
775//
776//  std::vector<int> sendBuff(nbClient,0);
777//  std::vector<int> sendNbIndexBuff(nbClient,0);
778//  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
779//                                      iteClientHash = hashedIndex.end();
780//  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it,
781//                                                                 ite = indexInfoMap.end();
782//  HashXIOS<size_t> hashGlobalIndex;
783//
784//  // Compute size of sending and receving buffer
785//  for (it = itb; it != ite; ++it)
786//  {
787//    size_t hashIndex = hashGlobalIndex(it->first);
788//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
789//    {
790//      int indexClient = std::distance(itbClientHash, itClientHash)-1;
791//      {
792//        ++sendNbIndexBuff[indexClient];
793//      }
794//    }
795//  }
796//
797//  boost::unordered_map<int, size_t*> client2ClientIndex;
798//  boost::unordered_map<int, unsigned char*> client2ClientInfo;
799//  for (int idx = 0; idx < nbClient; ++idx)
800//  {
801//    if (0 != sendNbIndexBuff[idx])
802//    {
803//      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
804//      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
805//      sendNbIndexBuff[idx] = 0;
806//      sendBuff[idx] = 1;
807//    }
808//  }
809//
810//  std::vector<int> sendNbInfo(nbClient,0);
811//  for (it = itb; it != ite; ++it)
812//  {
813//    size_t hashIndex = hashGlobalIndex(it->first);
814//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
815//    {
816//      int indexClient = std::distance(itbClientHash, itClientHash)-1;
817//      {
818//        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
819//        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
820//        ++sendNbIndexBuff[indexClient];
821//      }
822//    }
823//  }
824//
825//  // Calculate from how many clients each client receive message.
826//  // Calculate size of buffer for receiving message
827//  std::vector<int> recvRankClient, recvNbIndexClientCount;
828//  sendRecvRank(level, sendBuff, sendNbIndexBuff,
829//               recvRankClient, recvNbIndexClientCount);
830//
831//  int recvNbIndexCount = 0;
832//  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
833//    recvNbIndexCount += recvNbIndexClientCount[idx];
834//
835//  unsigned long* recvIndexBuff;
836//  unsigned char* recvInfoBuff;
837//  if (0 != recvNbIndexCount)
838//  {
839//    recvIndexBuff = new unsigned long[recvNbIndexCount];
840//    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
841//  }
842//
843//  // If a client holds information about index and the corresponding which don't belong to it,
844//  // it will send a message to the correct clients.
845//  // Contents of the message are index and its corresponding informatioin
846//  std::vector<MPI_Request> request;
847//  int currentIndex = 0;
848//  int nbRecvClient = recvRankClient.size();
849//  for (int idx = 0; idx < nbRecvClient; ++idx)
850//  {
851//    if (0 != recvNbIndexClientCount[idx])
852//    {
853//      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
854//      recvInfoFromClients(recvRankClient[idx],
855//                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
856//                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
857//                          commLevel, request);
858//    }
859//    currentIndex += recvNbIndexClientCount[idx];
860//  }
861//
862//  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
863//                                                iteIndex = client2ClientIndex.end();
864//  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
865//    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
866//  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
867//                                                      iteInfo = client2ClientInfo.end();
868//  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
869//    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
870//
871//  std::vector<MPI_Status> status(request.size());
872//  MPI_Waitall(request.size(), &request[0], &status[0]);
873//
874//  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
875//  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
876//  currentIndex = 0;
877//  InfoType infoValue;
878//  int infoIndex = 0;
879//  unsigned char* infoBuff = recvInfoBuff;
880//  for (int idx = 0; idx < nbRecvClient; ++idx)
881//  {
882//    int count = recvNbIndexClientCount[idx];
883//    for (int i = 0; i < count; ++i)
884//    {
885//      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
886//      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue;
887//    }
888//    currentIndex += count;
889//  }
890//
891//  if (0 != recvNbIndexCount)
892//  {
893//    delete [] recvIndexBuff;
894//    delete [] recvInfoBuff;
895//  }
896//  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
897//                                                               it != client2ClientInfo.end(); ++it)
898//      delete [] it->second;
899//
900//  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
901//                                                        it != client2ClientIndex.end(); ++it)
902//      delete [] it->second;
903//
904//  // Ok, now do something recursive
905//  if (0 < level)
906//  {
907//    --level;
908//    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
909//  }
910//  else
911//    index2InfoMapping_.swap(indexToInfoMapping); //index2InfoMapping_ = (indexToInfoMapping);
912//}
913
914/*!
915  Send message containing index to clients
916  \param [in] clientDestRank rank of destination client
917  \param [in] indices index to send
918  \param [in] indiceSize size of index array to send
919  \param [in] clientIntraComm communication group of client
920  \param [in] requestSendIndex list of sending request
921*/
922template<typename T, typename H>
923void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
924                                                       const MPI_Comm& clientIntraComm,
925                                                       std::vector<MPI_Request>& requestSendIndex)
926{
927  MPI_Request request;
928  requestSendIndex.push_back(request);
929  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
930            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
931}
932
933/*!
934  Receive message containing index to clients
935  \param [in] clientDestRank rank of destination client
936  \param [in] indices index to send
937  \param [in] clientIntraComm communication group of client
938  \param [in] requestRecvIndex list of receiving request
939*/
940template<typename T, typename H>
941void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
942                                                         const MPI_Comm& clientIntraComm,
943                                                         std::vector<MPI_Request>& requestRecvIndex)
944{
945  MPI_Request request;
946  requestRecvIndex.push_back(request);
947  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
948            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
949}
950
951/*!
952  Send message containing information to clients
953  \param [in] clientDestRank rank of destination client
954  \param [in] info info array to send
955  \param [in] infoSize info array size to send
956  \param [in] clientIntraComm communication group of client
957  \param [in] requestSendInfo list of sending request
958*/
959template<typename T, typename H>
960void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
961                                                      const MPI_Comm& clientIntraComm,
962                                                      std::vector<MPI_Request>& requestSendInfo)
963{
964  MPI_Request request;
965  requestSendInfo.push_back(request);
966
967  MPI_Isend(info, infoSize, MPI_CHAR,
968            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
969}
970
971/*!
972  Receive message containing information from other clients
973  \param [in] clientDestRank rank of destination client
974  \param [in] info info array to receive
975  \param [in] infoSize info array size to receive
976  \param [in] clientIntraComm communication group of client
977  \param [in] requestRecvInfo list of sending request
978*/
979template<typename T, typename H>
980void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
981                                                        const MPI_Comm& clientIntraComm,
982                                                        std::vector<MPI_Request>& requestRecvInfo)
983{
984  MPI_Request request;
985  requestRecvInfo.push_back(request);
986
987  MPI_Irecv(info, infoSize, MPI_CHAR,
988            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
989}
990
991/*!
992  Compute how many processes one process needs to send to and from how many processes it will receive.
993  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
994*/
995template<typename T, typename H>
996void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
997{
998  int groupBegin = this->getGroupBegin()[level];
999  int nbInGroup  = this->getNbInGroup()[level];
1000  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
1001  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
1002
1003  std::vector<size_t> hashedIndexGroup;
1004  computeHashIndex(hashedIndexGroup, nbInGroup);
1005  size_t a = hashedIndexGroup[rank-groupBegin];
1006  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
1007
1008  int currentGroup, offset;
1009  size_t e,f;
1010
1011  // Do a simple math [a,b) intersect [c,d)
1012  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
1013  {
1014    std::vector<size_t> hashedIndexGroupParent;
1015    int nbInGroupParent = nbInGroupParents[idx];
1016    if (0 != nbInGroupParent)
1017      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
1018    for (int i = 0; i < nbInGroupParent; ++i)
1019    {
1020      size_t c = hashedIndexGroupParent[i];
1021      size_t d = hashedIndexGroupParent[i+1]-1;
1022
1023    if (!((d < a) || (b <c)))
1024        recvRank_[level].push_back(groupParentBegin[idx]+i);
1025    }
1026
1027    offset = rank - groupParentBegin[idx];
1028    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
1029    {
1030      e = hashedIndexGroupParent[offset];
1031      f = hashedIndexGroupParent[offset+1]-1;
1032    }
1033  }
1034
1035  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
1036                                      iteHashGroup = hashedIndexGroup.end();
1037  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
1038  int begin = std::distance(itbHashGroup, itHashGroup)-1;
1039  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
1040  int end = std::distance(itbHashGroup, itHashGroup) -1;
1041  sendRank_[level].resize(end-begin+1);
1042  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
1043}
1044
1045/*!
1046  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
1047  \param [in] sendNbRank Rank of clients to send to
1048  \param [in] sendNbElements Number of elements each client to send to
1049  \param [in] receiveNbRank Rank of clients to receive from
1050  \param [out] recvNbElements Number of elements each client to send to
1051*/
1052template<typename T, typename H>
1053void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
1054                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
1055{
1056  recvNbElements.resize(recvNbRank.size());
1057  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
1058  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
1059
1060  int nRequest = 0;
1061  for (int idx = 0; idx < recvNbRank.size(); ++idx)
1062  {
1063    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
1064              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1065    ++nRequest;
1066  }
1067
1068  for (int idx = 0; idx < sendNbRank.size(); ++idx)
1069  {
1070    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
1071              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1072    ++nRequest;
1073  }
1074
1075  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
1076}
1077
1078/*!
1079  Send and receive number of process each process need to listen to as well as number
1080  of index it will receive during the initalization phase
1081  \param [in] level current level
1082  \param [in] sendNbRank Rank of clients to send to
1083  \param [in] sendNbElements Number of elements each client to send to
1084  \param [out] receiveNbRank Rank of clients to receive from
1085  \param [out] recvNbElements Number of elements each client to send to
1086*/
1087template<typename T, typename H>
1088void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
1089                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
1090                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
1091{
1092  int groupBegin = this->getGroupBegin()[level];
1093
1094  int offSet = 0;
1095  std::vector<int>& sendRank = sendRank_[level];
1096  std::vector<int>& recvRank = recvRank_[level];
1097  int sendBuffSize = sendRank.size();
1098  std::vector<int> sendBuff(sendBuffSize*2);
1099  int recvBuffSize = recvRank.size();
1100  std::vector<int> recvBuff(recvBuffSize*2,0);
1101
1102  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
1103  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
1104
1105  int nRequest = 0;
1106  for (int idx = 0; idx < recvBuffSize; ++idx)
1107  {
1108    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
1109              recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1110    ++nRequest;
1111  }
1112
1113  for (int idx = 0; idx < sendBuffSize; ++idx)
1114  {
1115    offSet = sendRank[idx]-groupBegin;
1116    sendBuff[idx*2] = sendNbRank[offSet];
1117    sendBuff[idx*2+1] = sendNbElements[offSet];
1118  }
1119
1120  for (int idx = 0; idx < sendBuffSize; ++idx)
1121  {
1122    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
1123              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1124    ++nRequest;
1125  }
1126
1127  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
1128  int nbRecvRank = 0, nbRecvElements = 0;
1129  recvNbRank.clear();
1130  recvNbElements.clear();
1131  for (int idx = 0; idx < recvBuffSize; ++idx)
1132  {
1133    if (0 != recvBuff[2*idx])
1134    {
1135      recvNbRank.push_back(recvRank[idx]);
1136      recvNbElements.push_back(recvBuff[2*idx+1]);
1137    }
1138  }
1139}
1140
1141}
Note: See TracBrowser for help on using the repository browser.