source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 867

Last change on this file since 867 was 867, checked in by mhnguyen, 8 years ago

Various clean up

+) Remove some redundant codes

Test
+) On Curie
+) tests pass

File size: 28.9 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm)
26  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
27{
28  MPI_Comm_size(clientIntraComm, &nbClient_);
29  this->computeMPICommLevel();
30  int nbLvl = this->getNbLevel();
31  sendRank_.resize(nbLvl);
32  recvRank_.resize(nbLvl);
33  Index2VectorInfoTypeMap indexToVecInfoMap;
34  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
35  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
36  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
37  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
38}
39
40/*!
41  Constructor with initial distribution information and the corresponding index
42  Each client (process) holds a piece of information as well as the attached index, the index
43will be redistributed (projected) into size_t space as long as the associated information.
44  \param [in] indexInfoMap initial index and information mapping
45  \param [in] clientIntraComm communicator of clients
46  \param [in] hierarLvl level of hierarchy
47*/
48template<typename T, typename H>
49CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
50                                                        const MPI_Comm& clientIntraComm)
51  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
52{
53  MPI_Comm_size(clientIntraComm, &nbClient_);
54  this->computeMPICommLevel();
55  int nbLvl = this->getNbLevel();
56  sendRank_.resize(nbLvl);
57  recvRank_.resize(nbLvl);
58  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
59}
60
61template<typename T, typename H>
62CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
63{
64}
65
66/*!
67  Compute mapping between indices and information corresponding to these indices
68  \param [in] indices indices a proc has
69*/
70template<typename T, typename H>
71void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
72{
73  int nbLvl = this->getNbLevel();
74  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
75}
76
77/*!
78    Compute mapping between indices and information corresponding to these indices
79for each level of hierarchical DHT. Recursive function
80   \param [in] indices indices a proc has
81   \param [in] commLevel communicator of current level
82   \param [in] level current level
83*/
84template<typename T, typename H>
85void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
86                                                                 const MPI_Comm& commLevel,
87                                                                 int level)
88{
89  int clientRank;
90  MPI_Comm_rank(commLevel,&clientRank);
91  int groupRankBegin = this->getGroupBegin()[level];
92  int nbClient = this->getNbInGroup()[level];
93  std::vector<size_t> hashedIndex;
94  computeHashIndex(hashedIndex, nbClient);
95
96  size_t ssize = indices.numElements(), hashedVal;
97
98  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
99                                      iteClientHash = hashedIndex.end();
100  std::vector<int> sendBuff(nbClient,0);
101  std::vector<int> sendNbIndexBuff(nbClient,0);
102
103  // Number of global index whose mapping server are on other clients
104  int nbIndexToSend = 0;
105  size_t index;
106  HashXIOS<size_t> hashGlobalIndex;
107  boost::unordered_map<size_t,int> nbIndices;
108  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
109  for (int i = 0; i < ssize; ++i)
110  {
111    index = indices(i);
112    if (0 == nbIndices.count(index))
113    {
114      hashedVal  = hashGlobalIndex(index);
115      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
116      int indexClient = std::distance(itbClientHash, itClientHash)-1;
117      ++sendNbIndexBuff[indexClient];
118      nbIndices[index] = 1;
119    }
120  }
121
122  boost::unordered_map<int, size_t* > client2ClientIndex;
123  for (int idx = 0; idx < nbClient; ++idx)
124  {
125    if (0 != sendNbIndexBuff[idx])
126    {
127      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
128      nbIndexToSend += sendNbIndexBuff[idx];
129      sendBuff[idx] = 1;
130      sendNbIndexBuff[idx] = 0;
131    }
132  }
133
134  for (int i = 0; i < ssize; ++i)
135  {
136    index = indices(i);
137    if (1 == nbIndices[index])
138    {
139      hashedVal  = hashGlobalIndex(index);
140      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
141      int indexClient = std::distance(itbClientHash, itClientHash)-1;
142      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
143      ++sendNbIndexBuff[indexClient];
144      ++nbIndices[index];
145    }
146  }
147
148  std::vector<int> recvRankClient, recvNbIndexClientCount;
149  sendRecvRank(level, sendBuff, sendNbIndexBuff,
150               recvRankClient, recvNbIndexClientCount);
151
152  int recvNbIndexCount = 0;
153  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
154    recvNbIndexCount += recvNbIndexClientCount[idx];
155
156  unsigned long* recvIndexBuff;
157  if (0 != recvNbIndexCount)
158    recvIndexBuff = new unsigned long[recvNbIndexCount];
159
160  std::vector<MPI_Request> request;
161  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
162                             iteRecvIndex = recvRankClient.end(),
163                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
164                           itRecvNbIndex;
165  int currentIndex = 0;
166  int nbRecvClient = recvRankClient.size();
167  for (int idx = 0; idx < nbRecvClient; ++idx)
168  {
169    if (0 != recvNbIndexClientCount[idx])
170      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
171    currentIndex += recvNbIndexClientCount[idx];
172  }
173
174  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
175                                                iteIndex = client2ClientIndex.end();
176  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
177    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
178
179  std::vector<MPI_Status> status(request.size());
180  MPI_Waitall(request.size(), &request[0], &status[0]);
181
182  CArray<size_t,1>* tmpGlobalIndex;
183  if (0 != recvNbIndexCount)
184    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
185  else
186    tmpGlobalIndex = new CArray<size_t,1>();
187
188  // OK, we go to the next level and do something recursive
189  if (0 < level)
190  {
191    --level;
192    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
193  }
194  else // Now, we are in the last level where necessary mappings are.
195    indexToInfoMappingLevel_= (index2InfoMapping_);
196
197  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
198  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
199  currentIndex = 0;
200  for (int idx = 0; idx < nbRecvClient; ++idx)
201  {
202    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
203    {
204      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
205      if (iteIndexToInfoMap != itIndexToInfoMap)
206        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
207    }
208    currentIndex += recvNbIndexClientCount[idx];
209  }
210
211  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
212  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
213  int indexIndex = 0;
214  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
215  {
216    recvRankOnReturn[indexIndex] = itIndex->first;
217  }
218  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
219                   recvRankOnReturn, recvNbIndexOnReturn);
220
221  int recvNbIndexCountOnReturn = 0;
222  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
223    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
224
225  unsigned long* recvIndexBuffOnReturn;
226  unsigned char* recvInfoBuffOnReturn;
227  if (0 != recvNbIndexCountOnReturn)
228  {
229    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
230    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
231  }
232
233  std::vector<MPI_Request> requestOnReturn;
234  currentIndex = 0;
235  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
236  {
237    if (0 != recvNbIndexOnReturn[idx])
238    {
239      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
240      recvInfoFromClients(recvRankOnReturn[idx],
241                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
242                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
243                          commLevel, requestOnReturn);
244    }
245    currentIndex += recvNbIndexOnReturn[idx];
246  }
247
248  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
249  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
250  currentIndex = 0;
251  for (int idx = 0; idx < nbRecvClient; ++idx)
252  {
253    if (0 != sendNbIndexOnReturn[idx])
254    {
255      int rank = recvRankClient[idx];
256      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
257      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
258      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
259      int infoIndex = 0;
260      int nb = 0;
261      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
262      {
263        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
264        if (iteIndexToInfoMap != itIndexToInfoMap)
265        {
266          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
267          for (int k = 0; k < infoTmp.size(); ++k)
268          {
269            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
270            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
271            ++nb;
272          }
273        }
274      }
275
276      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
277                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
278      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
279                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
280    }
281    currentIndex += recvNbIndexClientCount[idx];
282  }
283
284  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
285  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
286
287  Index2VectorInfoTypeMap indexToInfoMapping;
288  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
289  int infoIndex = 0;
290  InfoType unpackedInfo;
291  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
292  {
293    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
294    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
295  }
296
297  indexToInfoMappingLevel_.swap(indexToInfoMapping);
298  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
299  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
300                                                        it != client2ClientIndex.end(); ++it)
301      delete [] it->second;
302  delete tmpGlobalIndex;
303
304  if (0 != recvNbIndexCountOnReturn)
305  {
306    delete [] recvIndexBuffOnReturn;
307    delete [] recvInfoBuffOnReturn;
308  }
309
310  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
311                                                               it != client2ClientInfoOnReturn.end(); ++it)
312      delete [] it->second;
313
314  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
315                                            it != client2ClientIndexOnReturn.end(); ++it)
316      delete [] it->second;
317}
318
319/*!
320  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
321*/
322template<typename T, typename H>
323void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
324{
325  // Compute range of hash index for each client
326  hashedIndex.resize(nbClient+1);
327  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
328  size_t nbHashIndex;
329  hashedIndex[0] = 0;
330  for (int i = 1; i < nbClient; ++i)
331  {
332    nbHashIndex = nbHashIndexMax / nbClient;
333    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
334    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
335  }
336  hashedIndex[nbClient] = nbHashIndexMax;
337}
338
339/*!
340  Compute distribution of global index for servers
341  Each client already holds a piece of information and its associated index.
342This information will be redistributed among processes by projecting indices into size_t space,
343the corresponding information will be also distributed on size_t space.
344After the redistribution, each client holds rearranged index and its corresponding information.
345  \param [in] indexInfoMap index and its corresponding info (usually server index)
346  \param [in] commLevel communicator of current level
347  \param [in] level current level
348*/
349template<typename T, typename H>
350void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
351                                                            const MPI_Comm& commLevel,
352                                                            int level)
353{
354  int clientRank;
355  MPI_Comm_rank(commLevel,&clientRank);
356  computeSendRecvRank(level, clientRank);
357
358  int groupRankBegin = this->getGroupBegin()[level];
359  int nbClient = this->getNbInGroup()[level];
360  std::vector<size_t> hashedIndex;
361  computeHashIndex(hashedIndex, nbClient);
362
363  std::vector<int> sendBuff(nbClient,0);
364  std::vector<int> sendNbIndexBuff(nbClient,0);
365  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
366                                      iteClientHash = hashedIndex.end();
367  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
368                                                   ite = indexInfoMap.end();
369  HashXIOS<size_t> hashGlobalIndex;
370
371  // Compute size of sending and receving buffer
372  for (it = itb; it != ite; ++it)
373  {
374    size_t hashIndex = hashGlobalIndex(it->first);
375    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
376    int indexClient = std::distance(itbClientHash, itClientHash)-1;
377    sendNbIndexBuff[indexClient] += it->second.size();
378  }
379
380  boost::unordered_map<int, size_t*> client2ClientIndex;
381  boost::unordered_map<int, unsigned char*> client2ClientInfo;
382  for (int idx = 0; idx < nbClient; ++idx)
383  {
384    if (0 != sendNbIndexBuff[idx])
385    {
386      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
387      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
388      sendNbIndexBuff[idx] = 0;
389      sendBuff[idx] = 1;
390    }
391  }
392
393  std::vector<int> sendNbInfo(nbClient,0);
394  for (it = itb; it != ite; ++it)
395  {
396    const std::vector<InfoType>& infoTmp = it->second;
397    size_t hashIndex = hashGlobalIndex(it->first);
398    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
399    int indexClient = std::distance(itbClientHash, itClientHash)-1;
400    for (int idx = 0; idx < infoTmp.size(); ++idx)
401    {
402      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
403  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
404      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
405      ++sendNbIndexBuff[indexClient];
406    }
407  }
408
409  // Calculate from how many clients each client receive message.
410  // Calculate size of buffer for receiving message
411  std::vector<int> recvRankClient, recvNbIndexClientCount;
412  sendRecvRank(level, sendBuff, sendNbIndexBuff,
413               recvRankClient, recvNbIndexClientCount);
414
415  int recvNbIndexCount = 0;
416  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
417    recvNbIndexCount += recvNbIndexClientCount[idx];
418
419  unsigned long* recvIndexBuff;
420  unsigned char* recvInfoBuff;
421  if (0 != recvNbIndexCount)
422  {
423    recvIndexBuff = new unsigned long[recvNbIndexCount];
424    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
425  }
426
427  // If a client holds information about index and the corresponding which don't belong to it,
428  // it will send a message to the correct clients.
429  // Contents of the message are index and its corresponding informatioin
430  std::vector<MPI_Request> request;
431  int currentIndex = 0;
432  int nbRecvClient = recvRankClient.size();
433  for (int idx = 0; idx < nbRecvClient; ++idx)
434  {
435    if (0 != recvNbIndexClientCount[idx])
436    {
437      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
438      recvInfoFromClients(recvRankClient[idx],
439                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
440                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
441                          commLevel, request);
442    }
443    currentIndex += recvNbIndexClientCount[idx];
444  }
445
446  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
447                                                iteIndex = client2ClientIndex.end();
448  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
449    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
450  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
451                                                      iteInfo = client2ClientInfo.end();
452  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
453    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
454
455  std::vector<MPI_Status> status(request.size());
456  MPI_Waitall(request.size(), &request[0], &status[0]);
457
458  Index2VectorInfoTypeMap indexToInfoMapping;
459  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
460  currentIndex = 0;
461  InfoType infoValue;
462  int infoIndex = 0;
463  unsigned char* infoBuff = recvInfoBuff;
464  for (int idx = 0; idx < nbRecvClient; ++idx)
465  {
466    size_t index;
467    int count = recvNbIndexClientCount[idx];
468    for (int i = 0; i < count; ++i)
469    {
470      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
471      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
472    }
473    currentIndex += count;
474  }
475
476  if (0 != recvNbIndexCount)
477  {
478    delete [] recvIndexBuff;
479    delete [] recvInfoBuff;
480  }
481  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
482                                                               it != client2ClientInfo.end(); ++it)
483      delete [] it->second;
484
485  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
486                                                        it != client2ClientIndex.end(); ++it)
487      delete [] it->second;
488
489  // Ok, now do something recursive
490  if (0 < level)
491  {
492    --level;
493    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
494  }
495  else
496    index2InfoMapping_.swap(indexToInfoMapping);
497}
498
499/*!
500  Send message containing index to clients
501  \param [in] clientDestRank rank of destination client
502  \param [in] indices index to send
503  \param [in] indiceSize size of index array to send
504  \param [in] clientIntraComm communication group of client
505  \param [in] requestSendIndex list of sending request
506*/
507template<typename T, typename H>
508void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
509                                                       const MPI_Comm& clientIntraComm,
510                                                       std::vector<MPI_Request>& requestSendIndex)
511{
512  MPI_Request request;
513  requestSendIndex.push_back(request);
514  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
515            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
516}
517
518/*!
519  Receive message containing index to clients
520  \param [in] clientDestRank rank of destination client
521  \param [in] indices index to send
522  \param [in] clientIntraComm communication group of client
523  \param [in] requestRecvIndex list of receiving request
524*/
525template<typename T, typename H>
526void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
527                                                         const MPI_Comm& clientIntraComm,
528                                                         std::vector<MPI_Request>& requestRecvIndex)
529{
530  MPI_Request request;
531  requestRecvIndex.push_back(request);
532  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
533            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
534}
535
536/*!
537  Send message containing information to clients
538  \param [in] clientDestRank rank of destination client
539  \param [in] info info array to send
540  \param [in] infoSize info array size to send
541  \param [in] clientIntraComm communication group of client
542  \param [in] requestSendInfo list of sending request
543*/
544template<typename T, typename H>
545void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
546                                                      const MPI_Comm& clientIntraComm,
547                                                      std::vector<MPI_Request>& requestSendInfo)
548{
549  MPI_Request request;
550  requestSendInfo.push_back(request);
551
552  MPI_Isend(info, infoSize, MPI_CHAR,
553            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
554}
555
556/*!
557  Receive message containing information from other clients
558  \param [in] clientDestRank rank of destination client
559  \param [in] info info array to receive
560  \param [in] infoSize info array size to receive
561  \param [in] clientIntraComm communication group of client
562  \param [in] requestRecvInfo list of sending request
563*/
564template<typename T, typename H>
565void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
566                                                        const MPI_Comm& clientIntraComm,
567                                                        std::vector<MPI_Request>& requestRecvInfo)
568{
569  MPI_Request request;
570  requestRecvInfo.push_back(request);
571
572  MPI_Irecv(info, infoSize, MPI_CHAR,
573            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
574}
575
576/*!
577  Compute how many processes one process needs to send to and from how many processes it will receive.
578  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
579*/
580template<typename T, typename H>
581void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
582{
583  int groupBegin = this->getGroupBegin()[level];
584  int nbInGroup  = this->getNbInGroup()[level];
585  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
586  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
587
588  std::vector<size_t> hashedIndexGroup;
589  computeHashIndex(hashedIndexGroup, nbInGroup);
590  size_t a = hashedIndexGroup[rank-groupBegin];
591  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
592
593  int currentGroup, offset;
594  size_t e,f;
595
596  // Do a simple math [a,b) intersect [c,d)
597  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
598  {
599    std::vector<size_t> hashedIndexGroupParent;
600    int nbInGroupParent = nbInGroupParents[idx];
601    if (0 != nbInGroupParent)
602      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
603    for (int i = 0; i < nbInGroupParent; ++i)
604    {
605      size_t c = hashedIndexGroupParent[i];
606      size_t d = hashedIndexGroupParent[i+1]-1;
607
608    if (!((d < a) || (b <c)))
609        recvRank_[level].push_back(groupParentBegin[idx]+i);
610    }
611
612    offset = rank - groupParentBegin[idx];
613    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
614    {
615      e = hashedIndexGroupParent[offset];
616      f = hashedIndexGroupParent[offset+1]-1;
617    }
618  }
619
620  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
621                                      iteHashGroup = hashedIndexGroup.end();
622  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
623  int begin = std::distance(itbHashGroup, itHashGroup)-1;
624  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
625  int end = std::distance(itbHashGroup, itHashGroup) -1;
626  sendRank_[level].resize(end-begin+1);
627  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
628}
629
630/*!
631  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
632  \param [in] sendNbRank Rank of clients to send to
633  \param [in] sendNbElements Number of elements each client to send to
634  \param [in] receiveNbRank Rank of clients to receive from
635  \param [out] recvNbElements Number of elements each client to send to
636*/
637template<typename T, typename H>
638void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
639                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
640{
641  recvNbElements.resize(recvNbRank.size());
642  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
643  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
644
645  int nRequest = 0;
646  for (int idx = 0; idx < recvNbRank.size(); ++idx)
647  {
648    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
649              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
650    ++nRequest;
651  }
652
653  for (int idx = 0; idx < sendNbRank.size(); ++idx)
654  {
655    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
656              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
657    ++nRequest;
658  }
659
660  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
661}
662
663/*!
664  Send and receive number of process each process need to listen to as well as number
665  of index it will receive during the initalization phase
666  \param [in] level current level
667  \param [in] sendNbRank Rank of clients to send to
668  \param [in] sendNbElements Number of elements each client to send to
669  \param [out] receiveNbRank Rank of clients to receive from
670  \param [out] recvNbElements Number of elements each client to send to
671*/
672template<typename T, typename H>
673void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
674                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
675                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
676{
677  int groupBegin = this->getGroupBegin()[level];
678
679  int offSet = 0;
680  std::vector<int>& sendRank = sendRank_[level];
681  std::vector<int>& recvRank = recvRank_[level];
682  int sendBuffSize = sendRank.size();
683  std::vector<int> sendBuff(sendBuffSize*2);
684  int recvBuffSize = recvRank.size();
685  std::vector<int> recvBuff(recvBuffSize*2,0);
686
687  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
688  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
689
690  int nRequest = 0;
691  for (int idx = 0; idx < recvBuffSize; ++idx)
692  {
693    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
694              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
695    ++nRequest;
696  }
697
698  for (int idx = 0; idx < sendBuffSize; ++idx)
699  {
700    offSet = sendRank[idx]-groupBegin;
701    sendBuff[idx*2] = sendNbRank[offSet];
702    sendBuff[idx*2+1] = sendNbElements[offSet];
703  }
704
705  for (int idx = 0; idx < sendBuffSize; ++idx)
706  {
707    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
708              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
709    ++nRequest;
710  }
711
712  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
713  int nbRecvRank = 0, nbRecvElements = 0;
714  recvNbRank.clear();
715  recvNbElements.clear();
716  for (int idx = 0; idx < recvBuffSize; ++idx)
717  {
718    if (0 != recvBuff[2*idx])
719    {
720      recvNbRank.push_back(recvRank[idx]);
721      recvNbElements.push_back(recvBuff[2*idx+1]);
722    }
723  }
724}
725
726}
Note: See TracBrowser for help on using the repository browser.