source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 860

Last change on this file since 860 was 860, checked in by mhnguyen, 5 years ago

Improvement of DHT: allow multiple values correspond to one key

+) If there are several returned values corresponding to one key,
all of them are returned by DHT in vector.

Test
+) Testing only very basic. This commit serves as temporary one.

File size: 45.7 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm)
26  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
27{
28  MPI_Comm_size(clientIntraComm, &nbClient_);
29  this->computeMPICommLevel();
30  int nbLvl = this->getNbLevel();
31  sendRank_.resize(nbLvl);
32  recvRank_.resize(nbLvl);
33  Index2VectorInfoTypeMap indexToVecInfoMap;
34  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
35  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
36  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
37  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
38}
39
40/*!
41  Constructor with initial distribution information and the corresponding index
42  Each client (process) holds a piece of information as well as the attached index, the index
43will be redistributed (projected) into size_t space as long as the associated information.
44  \param [in] indexInfoMap initial index and information mapping
45  \param [in] clientIntraComm communicator of clients
46  \param [in] hierarLvl level of hierarchy
47*/
48template<typename T, typename H>
49CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
50                                                        const MPI_Comm& clientIntraComm)
51  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
52{
53  MPI_Comm_size(clientIntraComm, &nbClient_);
54  this->computeMPICommLevel();
55  int nbLvl = this->getNbLevel();
56  sendRank_.resize(nbLvl);
57  recvRank_.resize(nbLvl);
58  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
59}
60
61template<typename T, typename H>
62CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
63{
64}
65
66/*!
67  Compute mapping between indices and information corresponding to these indices
68  \param [in] indices indices a proc has
69*/
70template<typename T, typename H>
71void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
72{
73  int nbLvl = this->getNbLevel();
74  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
75}
76
77/*!
78    Compute mapping between indices and information corresponding to these indices
79for each level of hierarchical DHT. Recursive function
80   \param [in] indices indices a proc has
81   \param [in] commLevel communicator of current level
82   \param [in] level current level
83*/
84template<typename T, typename H>
85void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
86                                                                 const MPI_Comm& commLevel,
87                                                                 int level)
88{
89  int clientRank;
90  MPI_Comm_rank(commLevel,&clientRank);
91  int groupRankBegin = this->getGroupBegin()[level];
92  int nbClient = this->getNbInGroup()[level];
93  std::vector<size_t> hashedIndex;
94  computeHashIndex(hashedIndex, nbClient);
95
96  size_t ssize = indices.numElements(), hashedVal;
97
98  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
99                                      iteClientHash = hashedIndex.end();
100  std::vector<int> sendBuff(nbClient,0);
101  std::vector<int> sendNbIndexBuff(nbClient,0);
102
103  // Number of global index whose mapping server are on other clients
104  int nbIndexToSend = 0;
105  size_t index;
106  HashXIOS<size_t> hashGlobalIndex;
107  for (int i = 0; i < ssize; ++i)
108  {
109    index = indices(i);
110    hashedVal  = hashGlobalIndex(index);
111    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
112    int indexClient = std::distance(itbClientHash, itClientHash)-1;
113    ++sendNbIndexBuff[indexClient];
114  }
115
116  boost::unordered_map<int, size_t* > client2ClientIndex;
117  for (int idx = 0; idx < nbClient; ++idx)
118  {
119    if (0 != sendNbIndexBuff[idx])
120    {
121      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
122      nbIndexToSend += sendNbIndexBuff[idx];
123      sendBuff[idx] = 1;
124      sendNbIndexBuff[idx] = 0;
125    }
126  }
127
128  for (int i = 0; i < ssize; ++i)
129  {
130    index = indices(i);
131    hashedVal  = hashGlobalIndex(index);
132    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
133    {
134      int indexClient = std::distance(itbClientHash, itClientHash)-1;
135      {
136        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
137        ++sendNbIndexBuff[indexClient];
138      }
139    }
140  }
141
142  std::vector<int> recvRankClient, recvNbIndexClientCount;
143  sendRecvRank(level, sendBuff, sendNbIndexBuff,
144               recvRankClient, recvNbIndexClientCount);
145
146  int recvNbIndexCount = 0;
147  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
148    recvNbIndexCount += recvNbIndexClientCount[idx];
149
150  unsigned long* recvIndexBuff;
151  if (0 != recvNbIndexCount)
152    recvIndexBuff = new unsigned long[recvNbIndexCount];
153
154  std::vector<MPI_Request> request;
155  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
156                             iteRecvIndex = recvRankClient.end(),
157                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
158                           itRecvNbIndex;
159  int currentIndex = 0;
160  int nbRecvClient = recvRankClient.size();
161  for (int idx = 0; idx < nbRecvClient; ++idx)
162  {
163    if (0 != recvNbIndexClientCount[idx])
164      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
165    currentIndex += recvNbIndexClientCount[idx];
166  }
167
168  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
169                                                iteIndex = client2ClientIndex.end();
170  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
171    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
172
173  std::vector<MPI_Status> status(request.size());
174  MPI_Waitall(request.size(), &request[0], &status[0]);
175
176  CArray<size_t,1>* tmpGlobalIndex;
177  if (0 != recvNbIndexCount)
178    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
179  else
180    tmpGlobalIndex = new CArray<size_t,1>();
181
182  // OK, we go to the next level and do something recursive
183  if (0 < level)
184  {
185    --level;
186    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
187  }
188  else // Now, we are in the last level where necessary mappings are.
189    indexToInfoMappingLevel_= (index2InfoMapping_);
190
191  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
192  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
193  currentIndex = 0;
194  for (int idx = 0; idx < nbRecvClient; ++idx)
195  {
196    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
197    {
198      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
199      if (iteIndexToInfoMap != itIndexToInfoMap)
200        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
201//        ++sendNbIndexOnReturn[idx];
202    }
203    currentIndex += recvNbIndexClientCount[idx];
204  }
205
206  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
207  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
208  int indexIndex = 0;
209  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
210  {
211    recvRankOnReturn[indexIndex] = itIndex->first;
212  }
213  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
214                   recvRankOnReturn, recvNbIndexOnReturn);
215
216  int recvNbIndexCountOnReturn = 0;
217  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
218    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
219
220  unsigned long* recvIndexBuffOnReturn;
221  unsigned char* recvInfoBuffOnReturn;
222  if (0 != recvNbIndexCountOnReturn)
223  {
224    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
225    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
226  }
227
228  std::vector<MPI_Request> requestOnReturn;
229  currentIndex = 0;
230  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
231  {
232    if (0 != recvNbIndexOnReturn[idx])
233    {
234      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
235      recvInfoFromClients(recvRankOnReturn[idx],
236                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
237                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
238                          commLevel, requestOnReturn);
239    }
240    currentIndex += recvNbIndexOnReturn[idx];
241  }
242
243  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
244  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
245  currentIndex = 0;
246  for (int idx = 0; idx < nbRecvClient; ++idx)
247  {
248    if (0 != sendNbIndexOnReturn[idx])
249    {
250      int rank = recvRankClient[idx];
251      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
252      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
253      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
254      int infoIndex = 0;
255      int nb = 0;
256      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
257      {
258        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
259        if (iteIndexToInfoMap != itIndexToInfoMap)
260        {
261          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
262          for (int k = 0; k < infoTmp.size(); ++k)
263          {
264            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
265            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
266            ++nb;
267          }
268        }
269      }
270
271      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
272                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
273      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
274                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
275    }
276    currentIndex += recvNbIndexClientCount[idx];
277  }
278
279  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
280  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
281
282  Index2VectorInfoTypeMap indexToInfoMapping;
283  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
284  int infoIndex = 0;
285  InfoType unpackedInfo;
286  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
287  {
288    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
289    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
290//    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex);
291  }
292
293  indexToInfoMappingLevel_.swap(indexToInfoMapping); //indexToInfoMappingLevel_ = (indexToInfoMapping);
294  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
295  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
296                                                        it != client2ClientIndex.end(); ++it)
297      delete [] it->second;
298  delete tmpGlobalIndex;
299
300  if (0 != recvNbIndexCountOnReturn)
301  {
302    delete [] recvIndexBuffOnReturn;
303    delete [] recvInfoBuffOnReturn;
304  }
305
306  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
307                                                               it != client2ClientInfoOnReturn.end(); ++it)
308      delete [] it->second;
309
310  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
311                                            it != client2ClientIndexOnReturn.end(); ++it)
312      delete [] it->second;
313}
314
315///*!
316//    Compute mapping between indices and information corresponding to these indices
317//for each level of hierarchical DHT. Recursive function
318//   \param [in] indices indices a proc has
319//   \param [in] commLevel communicator of current level
320//   \param [in] level current level
321//*/
322//template<typename T, typename H>
323//void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
324//                                                                 const MPI_Comm& commLevel,
325//                                                                 int level)
326//{
327//  int clientRank;
328//  MPI_Comm_rank(commLevel,&clientRank);
329//  int groupRankBegin = this->getGroupBegin()[level];
330//  int nbClient = this->getNbInGroup()[level];
331//  std::vector<size_t> hashedIndex;
332//  computeHashIndex(hashedIndex, nbClient);
333//
334//  size_t ssize = indices.numElements(), hashedVal;
335//
336//  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
337//                                      iteClientHash = hashedIndex.end();
338//  std::vector<int> sendBuff(nbClient,0);
339//  std::vector<int> sendNbIndexBuff(nbClient,0);
340//
341//  // Number of global index whose mapping server are on other clients
342//  int nbIndexToSend = 0;
343//  size_t index;
344//  HashXIOS<size_t> hashGlobalIndex;
345//  for (int i = 0; i < ssize; ++i)
346//  {
347//    index = indices(i);
348//    hashedVal  = hashGlobalIndex(index);
349//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
350//    int indexClient = std::distance(itbClientHash, itClientHash)-1;
351//    ++sendNbIndexBuff[indexClient];
352//  }
353//
354//  boost::unordered_map<int, size_t* > client2ClientIndex;
355//  for (int idx = 0; idx < nbClient; ++idx)
356//  {
357//    if (0 != sendNbIndexBuff[idx])
358//    {
359//      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
360//      nbIndexToSend += sendNbIndexBuff[idx];
361//      sendBuff[idx] = 1;
362//      sendNbIndexBuff[idx] = 0;
363//    }
364//  }
365//
366//  for (int i = 0; i < ssize; ++i)
367//  {
368//    index = indices(i);
369//    hashedVal  = hashGlobalIndex(index);
370//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
371//    {
372//      int indexClient = std::distance(itbClientHash, itClientHash)-1;
373//      {
374//        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
375//        ++sendNbIndexBuff[indexClient];
376//      }
377//    }
378//  }
379//
380//  std::vector<int> recvRankClient, recvNbIndexClientCount;
381//  sendRecvRank(level, sendBuff, sendNbIndexBuff,
382//               recvRankClient, recvNbIndexClientCount);
383//
384//  int recvNbIndexCount = 0;
385//  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
386//    recvNbIndexCount += recvNbIndexClientCount[idx];
387//
388//  unsigned long* recvIndexBuff;
389//  if (0 != recvNbIndexCount)
390//    recvIndexBuff = new unsigned long[recvNbIndexCount];
391//
392//  std::vector<MPI_Request> request;
393//  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
394//                             iteRecvIndex = recvRankClient.end(),
395//                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
396//                           itRecvNbIndex;
397//  int currentIndex = 0;
398//  int nbRecvClient = recvRankClient.size();
399//  for (int idx = 0; idx < nbRecvClient; ++idx)
400//  {
401//    if (0 != recvNbIndexClientCount[idx])
402//      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
403//    currentIndex += recvNbIndexClientCount[idx];
404//  }
405//
406//  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
407//                                                iteIndex = client2ClientIndex.end();
408//  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
409//    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
410//
411//  std::vector<MPI_Status> status(request.size());
412//  MPI_Waitall(request.size(), &request[0], &status[0]);
413//
414//  CArray<size_t,1>* tmpGlobalIndex;
415//  if (0 != recvNbIndexCount)
416//    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
417//  else
418//    tmpGlobalIndex = new CArray<size_t,1>();
419//
420//  // OK, we go to the next level and do something recursive
421//  if (0 < level)
422//  {
423//    --level;
424//    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
425//  }
426//  else // Now, we are in the last level where necessary mappings are.
427//    indexToInfoMappingLevel_= (index2InfoMapping_);
428//
429//  typename Index2InfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
430//  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
431//  currentIndex = 0;
432//  for (int idx = 0; idx < nbRecvClient; ++idx)
433//  {
434//    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
435//    {
436//      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
437//      if (iteIndexToInfoMap != itIndexToInfoMap) ++sendNbIndexOnReturn[idx];
438//    }
439//    currentIndex += recvNbIndexClientCount[idx];
440//  }
441//
442//  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
443//  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
444//  int indexIndex = 0;
445//  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
446//  {
447//    recvRankOnReturn[indexIndex] = itIndex->first;
448//  }
449//  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
450//                   recvRankOnReturn, recvNbIndexOnReturn);
451//
452//  int recvNbIndexCountOnReturn = 0;
453//  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
454//    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
455//
456//  unsigned long* recvIndexBuffOnReturn;
457//  unsigned char* recvInfoBuffOnReturn;
458//  if (0 != recvNbIndexCountOnReturn)
459//  {
460//    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
461//    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
462//  }
463//
464//  std::vector<MPI_Request> requestOnReturn;
465//  currentIndex = 0;
466//  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
467//  {
468//    if (0 != recvNbIndexOnReturn[idx])
469//    {
470//      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
471//      recvInfoFromClients(recvRankOnReturn[idx],
472//                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
473//                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
474//                          commLevel, requestOnReturn);
475//    }
476//    currentIndex += recvNbIndexOnReturn[idx];
477//  }
478//
479//  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
480//  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
481//  currentIndex = 0;
482//  for (int idx = 0; idx < nbRecvClient; ++idx)
483//  {
484//    if (0 != sendNbIndexOnReturn[idx])
485//    {
486//      int rank = recvRankClient[idx];
487//      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
488//      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
489//      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
490//      int infoIndex = 0;
491//      int nb = 0;
492//      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
493//      {
494//        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
495//        if (iteIndexToInfoMap != itIndexToInfoMap)
496//        {
497//          client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
498//          ProcessDHTElement<InfoType>::packElement(itIndexToInfoMap->second, tmpInfoPtr, infoIndex);
499//          ++nb;
500//        }
501//      }
502//
503//      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
504//                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
505//      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
506//                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
507//    }
508//    currentIndex += recvNbIndexClientCount[idx];
509//  }
510//
511//  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
512//  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
513//
514//  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
515//  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
516//  int infoIndex = 0;
517//  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
518//  {
519//    ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[recvIndexBuffOnReturn[idx]], recvInfoBuffOnReturn, infoIndex);
520//  }
521//
522//  indexToInfoMappingLevel_.swap(indexToInfoMapping); //indexToInfoMappingLevel_ = (indexToInfoMapping);
523//  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
524//  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
525//                                                        it != client2ClientIndex.end(); ++it)
526//      delete [] it->second;
527//  delete tmpGlobalIndex;
528//
529//  if (0 != recvNbIndexCountOnReturn)
530//  {
531//    delete [] recvIndexBuffOnReturn;
532//    delete [] recvInfoBuffOnReturn;
533//  }
534//
535//  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
536//                                                               it != client2ClientInfoOnReturn.end(); ++it)
537//      delete [] it->second;
538//
539//  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
540//                                            it != client2ClientIndexOnReturn.end(); ++it)
541//      delete [] it->second;
542//}
543
544/*!
545  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
546*/
547template<typename T, typename H>
548void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
549{
550  // Compute range of hash index for each client
551  hashedIndex.resize(nbClient+1);
552  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
553  size_t nbHashIndex;
554  hashedIndex[0] = 0;
555  for (int i = 1; i < nbClient; ++i)
556  {
557    nbHashIndex = nbHashIndexMax / nbClient;
558    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
559    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
560  }
561  hashedIndex[nbClient] = nbHashIndexMax;
562}
563
564/*!
565  Compute distribution of global index for servers
566  Each client already holds a piece of information and its associated index.
567This information will be redistributed among processes by projecting indices into size_t space,
568the corresponding information will be also distributed on size_t space.
569After the redistribution, each client holds rearranged index and its corresponding information.
570  \param [in] indexInfoMap index and its corresponding info (usually server index)
571  \param [in] commLevel communicator of current level
572  \param [in] level current level
573*/
574template<typename T, typename H>
575void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
576                                                            const MPI_Comm& commLevel,
577                                                            int level)
578{
579  int clientRank;
580  MPI_Comm_rank(commLevel,&clientRank);
581  computeSendRecvRank(level, clientRank);
582
583  int groupRankBegin = this->getGroupBegin()[level];
584  int nbClient = this->getNbInGroup()[level];
585  std::vector<size_t> hashedIndex;
586  computeHashIndex(hashedIndex, nbClient);
587
588  std::vector<int> sendBuff(nbClient,0);
589  std::vector<int> sendNbIndexBuff(nbClient,0);
590  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
591                                      iteClientHash = hashedIndex.end();
592  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
593                                                   ite = indexInfoMap.end();
594  HashXIOS<size_t> hashGlobalIndex;
595
596  // Compute size of sending and receving buffer
597  for (it = itb; it != ite; ++it)
598  {
599    int infoVecSize = it->second.size();
600    for (int idx = 0; idx < infoVecSize; ++idx)
601    {
602      size_t hashIndex = hashGlobalIndex(it->first);
603      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
604      {
605        int indexClient = std::distance(itbClientHash, itClientHash)-1;
606        {
607          ++sendNbIndexBuff[indexClient];
608        }
609      }
610    }
611  }
612
613  boost::unordered_map<int, size_t*> client2ClientIndex;
614  boost::unordered_map<int, unsigned char*> client2ClientInfo;
615  for (int idx = 0; idx < nbClient; ++idx)
616  {
617    if (0 != sendNbIndexBuff[idx])
618    {
619      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
620      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
621      sendNbIndexBuff[idx] = 0;
622      sendBuff[idx] = 1;
623    }
624  }
625
626  std::vector<int> sendNbInfo(nbClient,0);
627  for (it = itb; it != ite; ++it)
628  {
629    const std::vector<InfoType>& infoTmp = it->second;
630    for (int idx = 0; idx < infoTmp.size(); ++idx)
631    {
632      size_t hashIndex = hashGlobalIndex(it->first);
633      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
634      {
635        int indexClient = std::distance(itbClientHash, itClientHash)-1;
636        {
637          client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
638//          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
639          ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
640          ++sendNbIndexBuff[indexClient];
641        }
642      }
643    }
644  }
645
646  // Calculate from how many clients each client receive message.
647  // Calculate size of buffer for receiving message
648  std::vector<int> recvRankClient, recvNbIndexClientCount;
649  sendRecvRank(level, sendBuff, sendNbIndexBuff,
650               recvRankClient, recvNbIndexClientCount);
651
652  int recvNbIndexCount = 0;
653  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
654    recvNbIndexCount += recvNbIndexClientCount[idx];
655
656  unsigned long* recvIndexBuff;
657  unsigned char* recvInfoBuff;
658  if (0 != recvNbIndexCount)
659  {
660    recvIndexBuff = new unsigned long[recvNbIndexCount];
661    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
662  }
663
664  // If a client holds information about index and the corresponding which don't belong to it,
665  // it will send a message to the correct clients.
666  // Contents of the message are index and its corresponding informatioin
667  std::vector<MPI_Request> request;
668  int currentIndex = 0;
669  int nbRecvClient = recvRankClient.size();
670  for (int idx = 0; idx < nbRecvClient; ++idx)
671  {
672    if (0 != recvNbIndexClientCount[idx])
673    {
674      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
675      recvInfoFromClients(recvRankClient[idx],
676                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
677                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
678                          commLevel, request);
679    }
680    currentIndex += recvNbIndexClientCount[idx];
681  }
682
683  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
684                                                iteIndex = client2ClientIndex.end();
685  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
686    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
687  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
688                                                      iteInfo = client2ClientInfo.end();
689  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
690    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
691
692  std::vector<MPI_Status> status(request.size());
693  MPI_Waitall(request.size(), &request[0], &status[0]);
694
695  Index2VectorInfoTypeMap indexToInfoMapping;
696  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
697  currentIndex = 0;
698  InfoType infoValue;
699  int infoIndex = 0;
700  unsigned char* infoBuff = recvInfoBuff;
701  for (int idx = 0; idx < nbRecvClient; ++idx)
702  {
703    int count = recvNbIndexClientCount[idx];
704    for (int i = 0; i < count; ++i)
705    {
706      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
707//      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue;
708      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
709    }
710    currentIndex += count;
711  }
712
713  if (0 != recvNbIndexCount)
714  {
715    delete [] recvIndexBuff;
716    delete [] recvInfoBuff;
717  }
718  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
719                                                               it != client2ClientInfo.end(); ++it)
720      delete [] it->second;
721
722  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
723                                                        it != client2ClientIndex.end(); ++it)
724      delete [] it->second;
725
726  // Ok, now do something recursive
727  if (0 < level)
728  {
729    --level;
730    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
731  }
732  else
733    index2InfoMapping_.swap(indexToInfoMapping);
734}
735
736///*!
737//  Compute distribution of global index for servers
738//  Each client already holds a piece of information and its associated index.
739//This information will be redistributed among processes by projecting indices into size_t space,
740//the corresponding information will be also distributed on size_t space.
741//After the redistribution, each client holds rearranged index and its corresponding information.
742//  \param [in] indexInfoMap index and its corresponding info (usually server index)
743//  \param [in] commLevel communicator of current level
744//  \param [in] level current level
745//*/
746//template<typename T, typename H>
747//void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
748//                                                            const MPI_Comm& commLevel,
749//                                                            int level)
750//{
751//  int clientRank;
752//  MPI_Comm_rank(commLevel,&clientRank);
753//  computeSendRecvRank(level, clientRank);
754//
755//  int groupRankBegin = this->getGroupBegin()[level];
756//  int nbClient = this->getNbInGroup()[level];
757//  std::vector<size_t> hashedIndex;
758//  computeHashIndex(hashedIndex, nbClient);
759//
760//  std::vector<int> sendBuff(nbClient,0);
761//  std::vector<int> sendNbIndexBuff(nbClient,0);
762//  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
763//                                      iteClientHash = hashedIndex.end();
764//  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it,
765//                                                                 ite = indexInfoMap.end();
766//  HashXIOS<size_t> hashGlobalIndex;
767//
768//  // Compute size of sending and receving buffer
769//  for (it = itb; it != ite; ++it)
770//  {
771//    size_t hashIndex = hashGlobalIndex(it->first);
772//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
773//    {
774//      int indexClient = std::distance(itbClientHash, itClientHash)-1;
775//      {
776//        ++sendNbIndexBuff[indexClient];
777//      }
778//    }
779//  }
780//
781//  boost::unordered_map<int, size_t*> client2ClientIndex;
782//  boost::unordered_map<int, unsigned char*> client2ClientInfo;
783//  for (int idx = 0; idx < nbClient; ++idx)
784//  {
785//    if (0 != sendNbIndexBuff[idx])
786//    {
787//      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
788//      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
789//      sendNbIndexBuff[idx] = 0;
790//      sendBuff[idx] = 1;
791//    }
792//  }
793//
794//  std::vector<int> sendNbInfo(nbClient,0);
795//  for (it = itb; it != ite; ++it)
796//  {
797//    size_t hashIndex = hashGlobalIndex(it->first);
798//    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
799//    {
800//      int indexClient = std::distance(itbClientHash, itClientHash)-1;
801//      {
802//        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
803//        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
804//        ++sendNbIndexBuff[indexClient];
805//      }
806//    }
807//  }
808//
809//  // Calculate from how many clients each client receive message.
810//  // Calculate size of buffer for receiving message
811//  std::vector<int> recvRankClient, recvNbIndexClientCount;
812//  sendRecvRank(level, sendBuff, sendNbIndexBuff,
813//               recvRankClient, recvNbIndexClientCount);
814//
815//  int recvNbIndexCount = 0;
816//  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
817//    recvNbIndexCount += recvNbIndexClientCount[idx];
818//
819//  unsigned long* recvIndexBuff;
820//  unsigned char* recvInfoBuff;
821//  if (0 != recvNbIndexCount)
822//  {
823//    recvIndexBuff = new unsigned long[recvNbIndexCount];
824//    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
825//  }
826//
827//  // If a client holds information about index and the corresponding which don't belong to it,
828//  // it will send a message to the correct clients.
829//  // Contents of the message are index and its corresponding informatioin
830//  std::vector<MPI_Request> request;
831//  int currentIndex = 0;
832//  int nbRecvClient = recvRankClient.size();
833//  for (int idx = 0; idx < nbRecvClient; ++idx)
834//  {
835//    if (0 != recvNbIndexClientCount[idx])
836//    {
837//      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
838//      recvInfoFromClients(recvRankClient[idx],
839//                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
840//                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
841//                          commLevel, request);
842//    }
843//    currentIndex += recvNbIndexClientCount[idx];
844//  }
845//
846//  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
847//                                                iteIndex = client2ClientIndex.end();
848//  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
849//    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
850//  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
851//                                                      iteInfo = client2ClientInfo.end();
852//  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
853//    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
854//
855//  std::vector<MPI_Status> status(request.size());
856//  MPI_Waitall(request.size(), &request[0], &status[0]);
857//
858//  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
859//  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
860//  currentIndex = 0;
861//  InfoType infoValue;
862//  int infoIndex = 0;
863//  unsigned char* infoBuff = recvInfoBuff;
864//  for (int idx = 0; idx < nbRecvClient; ++idx)
865//  {
866//    int count = recvNbIndexClientCount[idx];
867//    for (int i = 0; i < count; ++i)
868//    {
869//      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
870//      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)] = infoValue;
871//    }
872//    currentIndex += count;
873//  }
874//
875//  if (0 != recvNbIndexCount)
876//  {
877//    delete [] recvIndexBuff;
878//    delete [] recvInfoBuff;
879//  }
880//  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
881//                                                               it != client2ClientInfo.end(); ++it)
882//      delete [] it->second;
883//
884//  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
885//                                                        it != client2ClientIndex.end(); ++it)
886//      delete [] it->second;
887//
888//  // Ok, now do something recursive
889//  if (0 < level)
890//  {
891//    --level;
892//    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
893//  }
894//  else
895//    index2InfoMapping_.swap(indexToInfoMapping); //index2InfoMapping_ = (indexToInfoMapping);
896//}
897
898/*!
899  Send message containing index to clients
900  \param [in] clientDestRank rank of destination client
901  \param [in] indices index to send
902  \param [in] indiceSize size of index array to send
903  \param [in] clientIntraComm communication group of client
904  \param [in] requestSendIndex list of sending request
905*/
906template<typename T, typename H>
907void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
908                                                       const MPI_Comm& clientIntraComm,
909                                                       std::vector<MPI_Request>& requestSendIndex)
910{
911  MPI_Request request;
912  requestSendIndex.push_back(request);
913  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
914            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
915}
916
917/*!
918  Receive message containing index to clients
919  \param [in] clientDestRank rank of destination client
920  \param [in] indices index to send
921  \param [in] clientIntraComm communication group of client
922  \param [in] requestRecvIndex list of receiving request
923*/
924template<typename T, typename H>
925void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
926                                                         const MPI_Comm& clientIntraComm,
927                                                         std::vector<MPI_Request>& requestRecvIndex)
928{
929  MPI_Request request;
930  requestRecvIndex.push_back(request);
931  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
932            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
933}
934
935/*!
936  Send message containing information to clients
937  \param [in] clientDestRank rank of destination client
938  \param [in] info info array to send
939  \param [in] infoSize info array size to send
940  \param [in] clientIntraComm communication group of client
941  \param [in] requestSendInfo list of sending request
942*/
943template<typename T, typename H>
944void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
945                                                      const MPI_Comm& clientIntraComm,
946                                                      std::vector<MPI_Request>& requestSendInfo)
947{
948  MPI_Request request;
949  requestSendInfo.push_back(request);
950
951  MPI_Isend(info, infoSize, MPI_CHAR,
952            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
953}
954
955/*!
956  Receive message containing information from other clients
957  \param [in] clientDestRank rank of destination client
958  \param [in] info info array to receive
959  \param [in] infoSize info array size to receive
960  \param [in] clientIntraComm communication group of client
961  \param [in] requestRecvInfo list of sending request
962*/
963template<typename T, typename H>
964void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
965                                                        const MPI_Comm& clientIntraComm,
966                                                        std::vector<MPI_Request>& requestRecvInfo)
967{
968  MPI_Request request;
969  requestRecvInfo.push_back(request);
970
971  MPI_Irecv(info, infoSize, MPI_CHAR,
972            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
973}
974
975/*!
976  Compute how many processes one process needs to send to and from how many processes it will receive.
977  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
978*/
979template<typename T, typename H>
980void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
981{
982  int groupBegin = this->getGroupBegin()[level];
983  int nbInGroup  = this->getNbInGroup()[level];
984  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
985  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
986
987  std::vector<size_t> hashedIndexGroup;
988  computeHashIndex(hashedIndexGroup, nbInGroup);
989  size_t a = hashedIndexGroup[rank-groupBegin];
990  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
991
992  int currentGroup, offset;
993  size_t e,f;
994
995  // Do a simple math [a,b) intersect [c,d)
996  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
997  {
998    std::vector<size_t> hashedIndexGroupParent;
999    int nbInGroupParent = nbInGroupParents[idx];
1000    if (0 != nbInGroupParent)
1001      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
1002    for (int i = 0; i < nbInGroupParent; ++i)
1003    {
1004      size_t c = hashedIndexGroupParent[i];
1005      size_t d = hashedIndexGroupParent[i+1]-1;
1006
1007    if (!((d < a) || (b <c)))
1008        recvRank_[level].push_back(groupParentBegin[idx]+i);
1009    }
1010
1011    offset = rank - groupParentBegin[idx];
1012    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
1013    {
1014      e = hashedIndexGroupParent[offset];
1015      f = hashedIndexGroupParent[offset+1]-1;
1016    }
1017  }
1018
1019  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
1020                                      iteHashGroup = hashedIndexGroup.end();
1021  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
1022  int begin = std::distance(itbHashGroup, itHashGroup)-1;
1023  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
1024  int end = std::distance(itbHashGroup, itHashGroup) -1;
1025  sendRank_[level].resize(end-begin+1);
1026  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
1027}
1028
1029/*!
1030  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
1031  \param [in] sendNbRank Rank of clients to send to
1032  \param [in] sendNbElements Number of elements each client to send to
1033  \param [in] receiveNbRank Rank of clients to receive from
1034  \param [out] recvNbElements Number of elements each client to send to
1035*/
1036template<typename T, typename H>
1037void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
1038                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
1039{
1040  recvNbElements.resize(recvNbRank.size());
1041  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
1042  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
1043
1044  int nRequest = 0;
1045  for (int idx = 0; idx < recvNbRank.size(); ++idx)
1046  {
1047    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
1048              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1049    ++nRequest;
1050  }
1051
1052  for (int idx = 0; idx < sendNbRank.size(); ++idx)
1053  {
1054    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
1055              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1056    ++nRequest;
1057  }
1058
1059  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
1060}
1061
1062/*!
1063  Send and receive number of process each process need to listen to as well as number
1064  of index it will receive during the initalization phase
1065  \param [in] level current level
1066  \param [in] sendNbRank Rank of clients to send to
1067  \param [in] sendNbElements Number of elements each client to send to
1068  \param [out] receiveNbRank Rank of clients to receive from
1069  \param [out] recvNbElements Number of elements each client to send to
1070*/
1071template<typename T, typename H>
1072void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
1073                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
1074                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
1075{
1076  int groupBegin = this->getGroupBegin()[level];
1077
1078  int offSet = 0;
1079  std::vector<int>& sendRank = sendRank_[level];
1080  std::vector<int>& recvRank = recvRank_[level];
1081  int sendBuffSize = sendRank.size();
1082  std::vector<int> sendBuff(sendBuffSize*2);
1083  int recvBuffSize = recvRank.size();
1084  std::vector<int> recvBuff(recvBuffSize*2,0);
1085
1086  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
1087  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
1088
1089  int nRequest = 0;
1090  for (int idx = 0; idx < recvBuffSize; ++idx)
1091  {
1092    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
1093              recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1094    ++nRequest;
1095  }
1096
1097  for (int idx = 0; idx < sendBuffSize; ++idx)
1098  {
1099    offSet = sendRank[idx]-groupBegin;
1100    sendBuff[idx*2] = sendNbRank[offSet];
1101    sendBuff[idx*2+1] = sendNbElements[offSet];
1102  }
1103
1104  for (int idx = 0; idx < sendBuffSize; ++idx)
1105  {
1106    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
1107              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
1108    ++nRequest;
1109  }
1110
1111  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
1112  int nbRecvRank = 0, nbRecvElements = 0;
1113  recvNbRank.clear();
1114  recvNbElements.clear();
1115  for (int idx = 0; idx < recvBuffSize; ++idx)
1116  {
1117    if (0 != recvBuff[2*idx])
1118    {
1119      recvNbRank.push_back(recvRank[idx]);
1120      recvNbElements.push_back(recvBuff[2*idx+1]);
1121    }
1122  }
1123}
1124
1125}
Note: See TracBrowser for help on using the repository browser.