source: XIOS/dev/branch_yushan/src/client_client_dht_template_impl.hpp @ 1037

Last change on this file since 1037 was 1037, checked in by yushan, 7 years ago

initialize the branch

File size: 32.0 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#endif
15
16
17namespace xios
18{
19template<typename T, typename H>
20CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const MPI_Comm& clientIntraComm)
21  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
22{
23  MPI_Comm_size(clientIntraComm, &nbClient_);
24  this->computeMPICommLevel();
25  int nbLvl = this->getNbLevel();
26  sendRank_.resize(nbLvl);
27  recvRank_.resize(nbLvl);
28}
29
30/*!
31  Constructor with initial distribution information and the corresponding index
32  Each client (process) holds a piece of information as well as the attached index, the index
33will be redistributed (projected) into size_t space as long as the associated information.
34  \param [in] indexInfoMap initial index and information mapping
35  \param [in] clientIntraComm communicator of clients
36  \param [in] hierarLvl level of hierarchy
37*/
38template<typename T, typename H>
39CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
40                                                        const MPI_Comm& clientIntraComm)
41  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
42{
43 
44  MPI_Comm_size(clientIntraComm, &nbClient_);
45  this->computeMPICommLevel();
46  int nbLvl = this->getNbLevel(); 
47  sendRank_.resize(nbLvl);
48  recvRank_.resize(nbLvl);
49  Index2VectorInfoTypeMap indexToVecInfoMap;
50  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor())); 
51  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end(); 
52  for (; it != ite; ++it)
53  {
54    indexToVecInfoMap[it->first].push_back(it->second);
55  } 
56 
57  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1); 
58}
59
60/*!
61  Constructor with initial distribution information and the corresponding index
62  Each client (process) holds a piece of information as well as the attached index, the index
63will be redistributed (projected) into size_t space as long as the associated information.
64  \param [in] indexInfoMap initial index and information mapping
65  \param [in] clientIntraComm communicator of clients
66  \param [in] hierarLvl level of hierarchy
67*/
68template<typename T, typename H>
69CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
70                                                        const MPI_Comm& clientIntraComm)
71  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
72{
73  MPI_Comm_size(clientIntraComm, &nbClient_);
74  this->computeMPICommLevel();
75  int nbLvl = this->getNbLevel();
76  sendRank_.resize(nbLvl);
77  recvRank_.resize(nbLvl);
78  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
79}
80
81template<typename T, typename H>
82CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
83{
84}
85
86/*!
87  Compute mapping between indices and information corresponding to these indices
88  \param [in] indices indices a proc has
89*/
90template<typename T, typename H>
91void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
92{
93  int nbLvl = this->getNbLevel();
94  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
95}
96
97/*!
98    Compute mapping between indices and information corresponding to these indices
99for each level of hierarchical DHT. Recursive function
100   \param [in] indices indices a proc has
101   \param [in] commLevel communicator of current level
102   \param [in] level current level
103*/
104template<typename T, typename H>
105void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
106                                                                 const MPI_Comm& commLevel,
107                                                                 int level)
108{
109  int clientRank;
110  MPI_Comm_rank(commLevel,&clientRank);
111  int groupRankBegin = this->getGroupBegin()[level];
112  int nbClient = this->getNbInGroup()[level];
113  std::vector<size_t> hashedIndex;
114  computeHashIndex(hashedIndex, nbClient);
115
116  size_t ssize = indices.numElements(), hashedVal;
117
118  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
119                                      iteClientHash = hashedIndex.end();
120  std::vector<int> sendBuff(nbClient,0);
121  std::vector<int> sendNbIndexBuff(nbClient,0);
122
123  // Number of global index whose mapping server are on other clients
124  int nbIndexToSend = 0;
125  size_t index;
126  HashXIOS<size_t> hashGlobalIndex;
127  boost::unordered_map<size_t,int> nbIndices;
128  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
129  for (int i = 0; i < ssize; ++i)
130  {
131    index = indices(i);
132    if (0 == nbIndices.count(index))
133    {
134      hashedVal  = hashGlobalIndex(index);
135      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
136      int indexClient = std::distance(itbClientHash, itClientHash)-1;
137      ++sendNbIndexBuff[indexClient];
138      nbIndices[index] = 1;
139    }
140  }
141
142  boost::unordered_map<int, size_t* > client2ClientIndex;
143  for (int idx = 0; idx < nbClient; ++idx)
144  {
145    if (0 != sendNbIndexBuff[idx])
146    {
147      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
148      nbIndexToSend += sendNbIndexBuff[idx];
149      sendBuff[idx] = 1;
150      sendNbIndexBuff[idx] = 0;
151    }
152  }
153
154  for (int i = 0; i < ssize; ++i)
155  {
156    index = indices(i);
157    if (1 == nbIndices[index])
158    {
159      hashedVal  = hashGlobalIndex(index);
160      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
161      int indexClient = std::distance(itbClientHash, itClientHash)-1;
162      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
163      ++sendNbIndexBuff[indexClient];
164      ++nbIndices[index];
165    }
166  }
167
168  std::vector<int> recvRankClient, recvNbIndexClientCount;
169  sendRecvRank(level, sendBuff, sendNbIndexBuff,
170               recvRankClient, recvNbIndexClientCount);
171
172  int recvNbIndexCount = 0;
173  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
174    recvNbIndexCount += recvNbIndexClientCount[idx];
175
176  unsigned long* recvIndexBuff;
177  if (0 != recvNbIndexCount)
178    recvIndexBuff = new unsigned long[recvNbIndexCount];
179
180  std::vector<MPI_Request> request;
181  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
182                             iteRecvIndex = recvRankClient.end(),
183                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
184                           itRecvNbIndex;
185  int currentIndex = 0;
186  int nbRecvClient = recvRankClient.size();
187  for (int idx = 0; idx < nbRecvClient; ++idx)
188  {
189    if (0 != recvNbIndexClientCount[idx])
190    {
191      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
192    }
193    currentIndex += recvNbIndexClientCount[idx];
194  }
195
196  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
197                                                iteIndex = client2ClientIndex.end();
198  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
199    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
200
201  std::vector<MPI_Status> status(request.size());
202
203  //printf("1(%d): calling wait all for %lu requests\n", clientRank, request.size());
204
205  MPI_Waitall(request.size(), &request[0], &status[0]);
206
207
208  //printf("               1(%d): calling wait all for %lu requests OK\n", clientRank, request.size());
209
210  CArray<size_t,1>* tmpGlobalIndex;
211  if (0 != recvNbIndexCount)
212    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
213  else
214    tmpGlobalIndex = new CArray<size_t,1>();
215
216  // OK, we go to the next level and do something recursive
217  if (0 < level)
218  {
219    --level;
220    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
221  }
222  else // Now, we are in the last level where necessary mappings are.
223    indexToInfoMappingLevel_= (index2InfoMapping_);
224
225  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
226  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
227  currentIndex = 0;
228  for (int idx = 0; idx < nbRecvClient; ++idx)
229  {
230    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
231    {
232      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
233      if (iteIndexToInfoMap != itIndexToInfoMap)
234        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
235    }
236    currentIndex += recvNbIndexClientCount[idx];
237  }
238
239  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
240  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
241  int indexIndex = 0;
242  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
243  {
244    recvRankOnReturn[indexIndex] = itIndex->first;
245  }
246  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
247                   recvRankOnReturn, recvNbIndexOnReturn);
248
249  int recvNbIndexCountOnReturn = 0;
250  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
251    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
252
253  unsigned long* recvIndexBuffOnReturn;
254  unsigned char* recvInfoBuffOnReturn;
255  if (0 != recvNbIndexCountOnReturn)
256  {
257    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
258    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
259  }
260
261  std::vector<MPI_Request> requestOnReturn;
262  currentIndex = 0;
263  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
264  {
265    if (0 != recvNbIndexOnReturn[idx])
266    {
267      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
268      recvInfoFromClients(recvRankOnReturn[idx],
269                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
270                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
271                          commLevel, requestOnReturn);
272    }
273    currentIndex += recvNbIndexOnReturn[idx];
274  }
275
276  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
277  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
278  currentIndex = 0;
279  for (int idx = 0; idx < nbRecvClient; ++idx)
280  {
281    if (0 != sendNbIndexOnReturn[idx])
282    {
283      int rank = recvRankClient[idx];
284      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
285      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
286      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
287      int infoIndex = 0;
288      int nb = 0;
289      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
290      {
291        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
292        if (iteIndexToInfoMap != itIndexToInfoMap)
293        {
294          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
295          for (int k = 0; k < infoTmp.size(); ++k)
296          {
297            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
298            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
299            ++nb;
300          }
301        }
302      }
303
304      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
305                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
306      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
307                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
308    }
309    currentIndex += recvNbIndexClientCount[idx];
310  }
311
312  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
313  //printf("2(%d): calling wait all for %lu requests\n", clientRank, requestOnReturn.size());
314  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
315
316  //printf("            2(%d): calling wait all for %lu requests OK\n", clientRank, requestOnReturn.size());
317
318  Index2VectorInfoTypeMap indexToInfoMapping;
319  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
320  int infoIndex = 0;
321  InfoType unpackedInfo;
322  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
323  {
324    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
325    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
326  }
327
328  indexToInfoMappingLevel_.swap(indexToInfoMapping);
329  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
330  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
331                                                        it != client2ClientIndex.end(); ++it)
332      delete [] it->second;
333  delete tmpGlobalIndex;
334
335  if (0 != recvNbIndexCountOnReturn)
336  {
337    delete [] recvIndexBuffOnReturn;
338    delete [] recvInfoBuffOnReturn;
339  }
340
341  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
342                                                               it != client2ClientInfoOnReturn.end(); ++it)
343      delete [] it->second;
344
345  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
346                                            it != client2ClientIndexOnReturn.end(); ++it)
347      delete [] it->second;
348}
349
350/*!
351  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
352*/
353template<typename T, typename H>
354void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
355{
356  // Compute range of hash index for each client
357  hashedIndex.resize(nbClient+1);
358  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
359  size_t nbHashIndex;
360  hashedIndex[0] = 0;
361  for (int i = 1; i < nbClient; ++i)
362  {
363    nbHashIndex = nbHashIndexMax / nbClient;
364    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
365    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
366  }
367  hashedIndex[nbClient] = nbHashIndexMax;
368}
369
370/*!
371  Compute distribution of global index for servers
372  Each client already holds a piece of information and its associated index.
373This information will be redistributed among processes by projecting indices into size_t space,
374the corresponding information will be also distributed on size_t space.
375After the redistribution, each client holds rearranged index and its corresponding information.
376  \param [in] indexInfoMap index and its corresponding info (usually server index)
377  \param [in] commLevel communicator of current level
378  \param [in] level current level
379*/
380template<typename T, typename H>
381void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
382                                                            const MPI_Comm& commLevel,
383                                                            int level)
384{
385  //printf("in computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap, const MPI_Comm& commLevel, int level)\n");
386  int clientRank;
387  MPI_Comm_rank(commLevel,&clientRank);
388  computeSendRecvRank(level, clientRank);
389
390  int groupRankBegin = this->getGroupBegin()[level];
391  int nbClient = this->getNbInGroup()[level];
392  std::vector<size_t> hashedIndex;
393  computeHashIndex(hashedIndex, nbClient);
394
395  std::vector<int> sendBuff(nbClient,0);
396  std::vector<int> sendNbIndexBuff(nbClient,0);
397  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
398                                      iteClientHash = hashedIndex.end();
399  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
400                                                   ite = indexInfoMap.end();
401  HashXIOS<size_t> hashGlobalIndex;
402
403  // Compute size of sending and receving buffer
404  for (it = itb; it != ite; ++it)
405  {
406    size_t hashIndex = hashGlobalIndex(it->first);
407    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
408    int indexClient = std::distance(itbClientHash, itClientHash)-1;
409    sendNbIndexBuff[indexClient] += it->second.size();
410  }
411
412  boost::unordered_map<int, size_t*> client2ClientIndex;
413  boost::unordered_map<int, unsigned char*> client2ClientInfo;
414  for (int idx = 0; idx < nbClient; ++idx)
415  {
416    if (0 != sendNbIndexBuff[idx])
417    {
418      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
419      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
420      sendNbIndexBuff[idx] = 0;
421      sendBuff[idx] = 1;
422    }
423  }
424
425  std::vector<int> sendNbInfo(nbClient,0);
426  for (it = itb; it != ite; ++it)
427  {
428    const std::vector<InfoType>& infoTmp = it->second;
429    size_t hashIndex = hashGlobalIndex(it->first);
430    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
431    int indexClient = std::distance(itbClientHash, itClientHash)-1;
432    for (int idx = 0; idx < infoTmp.size(); ++idx)
433    {
434      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
435  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
436      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
437      ++sendNbIndexBuff[indexClient];
438    }
439  }
440
441  //printf("check 4 OK. clientRank = %d\n", clientRank);
442
443  // Calculate from how many clients each client receive message.
444  // Calculate size of buffer for receiving message
445  std::vector<int> recvRankClient, recvNbIndexClientCount;
446  sendRecvRank(level, sendBuff, sendNbIndexBuff,
447               recvRankClient, recvNbIndexClientCount);
448  //printf("sendRecvRank OK\n");
449
450  int recvNbIndexCount = 0;
451  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
452    recvNbIndexCount += recvNbIndexClientCount[idx];
453
454  unsigned long* recvIndexBuff;
455  unsigned char* recvInfoBuff;
456  if (0 != recvNbIndexCount)
457  {
458    recvIndexBuff = new unsigned long[recvNbIndexCount];
459    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
460  }
461
462  //printf("check 5 OK. clientRank = %d\n", clientRank);
463
464  // If a client holds information about index and the corresponding which don't belong to it,
465  // it will send a message to the correct clients.
466  // Contents of the message are index and its corresponding informatioin
467  std::vector<MPI_Request> request;
468  int currentIndex = 0;
469  int nbRecvClient = recvRankClient.size();
470  for (int idx = 0; idx < nbRecvClient; ++idx)
471  {
472    if (0 != recvNbIndexClientCount[idx])
473    {
474      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
475
476      recvInfoFromClients(recvRankClient[idx],
477                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
478                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
479                          commLevel, request);
480
481    }
482    currentIndex += recvNbIndexClientCount[idx];
483  }
484
485  //printf("check 6 OK. clientRank = %d\n", clientRank);
486
487  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
488                                                iteIndex = client2ClientIndex.end();
489  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
490  {
491    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
492
493  }
494
495  //printf("check 7 OK. clientRank = %d\n", clientRank);
496
497  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
498                                                      iteInfo = client2ClientInfo.end();
499  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
500  {
501    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
502
503  }
504
505  //printf("check 8 OK. clientRank = %d\n", clientRank);
506  std::vector<MPI_Status> status(request.size());
507
508  MPI_Waitall(request.size(), &request[0], &status[0]);
509
510  Index2VectorInfoTypeMap indexToInfoMapping;
511  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
512  currentIndex = 0;
513  InfoType infoValue;
514  int infoIndex = 0;
515  unsigned char* infoBuff = recvInfoBuff;
516  for (int idx = 0; idx < nbRecvClient; ++idx)
517  {
518    size_t index;
519    int count = recvNbIndexClientCount[idx];
520    for (int i = 0; i < count; ++i)
521    {
522      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
523      unsigned long pp = *(recvIndexBuff+currentIndex+i);
524     
525      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
526    }
527    currentIndex += count;
528  }
529
530  //printf("check 9 OK. clientRank = %d\n", clientRank);
531
532  if (0 != recvNbIndexCount)
533  {
534    delete [] recvIndexBuff;
535    delete [] recvInfoBuff;
536  }
537  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
538                                                               it != client2ClientInfo.end(); ++it)
539      delete [] it->second;
540
541  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
542                                                        it != client2ClientIndex.end(); ++it)
543      delete [] it->second;
544
545  //printf("check 10 OK. clientRank = %d\n", clientRank);
546  // Ok, now do something recursive
547  if (0 < level)
548  {
549    --level;
550    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
551  }
552  else
553    index2InfoMapping_.swap(indexToInfoMapping);
554}
555
556/*!
557  Send message containing index to clients
558  \param [in] clientDestRank rank of destination client
559  \param [in] indices index to send
560  \param [in] indiceSize size of index array to send
561  \param [in] clientIntraComm communication group of client
562  \param [in] requestSendIndex list of sending request
563*/
564template<typename T, typename H>
565void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
566                                                       const MPI_Comm& clientIntraComm,
567                                                       std::vector<MPI_Request>& requestSendIndex)
568{
569  MPI_Request request;
570  requestSendIndex.push_back(request);
571
572  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
573            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
574}
575
576/*!
577  Receive message containing index to clients
578  \param [in] clientDestRank rank of destination client
579  \param [in] indices index to send
580  \param [in] clientIntraComm communication group of client
581  \param [in] requestRecvIndex list of receiving request
582*/
583template<typename T, typename H>
584void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
585                                                         const MPI_Comm& clientIntraComm,
586                                                         std::vector<MPI_Request>& requestRecvIndex)
587{
588  MPI_Request request;
589  requestRecvIndex.push_back(request);
590
591  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
592            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
593}
594
595/*!
596  Send message containing information to clients
597  \param [in] clientDestRank rank of destination client
598  \param [in] info info array to send
599  \param [in] infoSize info array size to send
600  \param [in] clientIntraComm communication group of client
601  \param [in] requestSendInfo list of sending request
602*/
603template<typename T, typename H>
604void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
605                                                      const MPI_Comm& clientIntraComm,
606                                                      std::vector<MPI_Request>& requestSendInfo)
607{
608  MPI_Request request;
609  requestSendInfo.push_back(request);
610  //printf("MPI_IsendInfo(info, infoSize, MPI_CHAR,... char count = %d, dest = %d, buf_size = %d\n", infoSize, clientDestRank, sizeof(*info) );
611  MPI_Isend(info, infoSize, MPI_CHAR,
612            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
613}
614
615/*!
616  Receive message containing information from other clients
617  \param [in] clientDestRank rank of destination client
618  \param [in] info info array to receive
619  \param [in] infoSize info array size to receive
620  \param [in] clientIntraComm communication group of client
621  \param [in] requestRecvInfo list of sending request
622*/
623template<typename T, typename H>
624void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
625                                                        const MPI_Comm& clientIntraComm,
626                                                        std::vector<MPI_Request>& requestRecvInfo)
627{
628  MPI_Request request;
629  requestRecvInfo.push_back(request);
630
631  MPI_Irecv(info, infoSize, MPI_CHAR,
632            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
633}
634
635/*!
636  Compute how many processes one process needs to send to and from how many processes it will receive.
637  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
638*/
639template<typename T, typename H>
640void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
641{
642  int groupBegin = this->getGroupBegin()[level];
643  int nbInGroup  = this->getNbInGroup()[level];
644  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
645  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
646
647  std::vector<size_t> hashedIndexGroup;
648  computeHashIndex(hashedIndexGroup, nbInGroup);
649  size_t a = hashedIndexGroup[rank-groupBegin];
650  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
651
652  int currentGroup, offset;
653  size_t e,f;
654
655  // Do a simple math [a,b) intersect [c,d)
656  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
657  {
658    std::vector<size_t> hashedIndexGroupParent;
659    int nbInGroupParent = nbInGroupParents[idx];
660    if (0 != nbInGroupParent)
661      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
662    for (int i = 0; i < nbInGroupParent; ++i)
663    {
664      size_t c = hashedIndexGroupParent[i];
665      size_t d = hashedIndexGroupParent[i+1]-1;
666
667    if (!((d < a) || (b <c)))
668        recvRank_[level].push_back(groupParentBegin[idx]+i);
669    }
670
671    offset = rank - groupParentBegin[idx];
672    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
673    {
674      e = hashedIndexGroupParent[offset];
675      f = hashedIndexGroupParent[offset+1]-1;
676    }
677  }
678
679  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
680                                      iteHashGroup = hashedIndexGroup.end();
681  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
682  int begin = std::distance(itbHashGroup, itHashGroup)-1;
683  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
684  int end = std::distance(itbHashGroup, itHashGroup) -1;
685  sendRank_[level].resize(end-begin+1);
686  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
687}
688
689/*!
690  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
691  \param [in] sendNbRank Rank of clients to send to
692  \param [in] sendNbElements Number of elements each client to send to
693  \param [in] receiveNbRank Rank of clients to receive from
694  \param [out] recvNbElements Number of elements each client to send to
695*/
696template<typename T, typename H>
697void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
698                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
699{
700  recvNbElements.resize(recvNbRank.size());
701  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
702  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
703
704  int nRequest = 0;
705  for (int idx = 0; idx < recvNbRank.size(); ++idx)
706  {
707    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
708              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
709    ++nRequest;
710  }
711
712  for (int idx = 0; idx < sendNbRank.size(); ++idx)
713  {
714    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
715              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
716    ++nRequest;
717  }
718 
719  int clientRank;
720  MPI_Comm_rank(this->internalComm_,&clientRank);
721  //printf("4(%d): calling wait all for %lu requests\n", clientRank, sendNbRank.size()+recvNbRank.size());
722  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
723  //printf("        4(%d): calling wait all for %lu requests OK\n", clientRank, sendNbRank.size()+recvNbRank.size());
724}
725
726/*!
727  Send and receive number of process each process need to listen to as well as number
728  of index it will receive during the initalization phase
729  \param [in] level current level
730  \param [in] sendNbRank Rank of clients to send to
731  \param [in] sendNbElements Number of elements each client to send to
732  \param [out] receiveNbRank Rank of clients to receive from
733  \param [out] recvNbElements Number of elements each client to send to
734*/
735template<typename T, typename H>
736void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
737                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
738                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
739{
740  int myRank;
741  MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
742  //printf("myRank = %d, in sendRecvRank(int level, const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements, std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)\n", myRank);
743  int groupBegin = this->getGroupBegin()[level];
744
745  int offSet = 0;
746  std::vector<int>& sendRank = sendRank_[level];
747  std::vector<int>& recvRank = recvRank_[level];
748  int sendBuffSize = sendRank.size();
749  std::vector<int> sendBuff(sendBuffSize*2);
750  int recvBuffSize = recvRank.size();
751  std::vector<int> recvBuff(recvBuffSize*2,0);
752
753  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
754  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
755
756  int nRequest = 0;
757  for (int idx = 0; idx < recvBuffSize; ++idx)
758  {
759    //printf("myRank = %d starts irecv with src = %d, tag = %d, idx = %d\n", myRank, recvRank[idx], MPI_DHT_INDEX_0, idx);
760    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
761              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
762    //printf("myRank = %d MPI_Irecv OK, idx = %d, nRequest = %d\n", myRank, idx, nRequest);
763    ++nRequest;
764  }
765
766  //printf("myRank = %d, check 1 OK\n", myRank);
767
768  for (int idx = 0; idx < sendBuffSize; ++idx)
769  {
770    offSet = sendRank[idx]-groupBegin;
771    sendBuff[idx*2] = sendNbRank[offSet];
772    sendBuff[idx*2+1] = sendNbElements[offSet];
773  }
774
775  for (int idx = 0; idx < sendBuffSize; ++idx)
776  {
777    //printf("myRank = %d starts isend with dest = %d, tag = %d, idx = %d\n", myRank, sendRank[idx], MPI_DHT_INDEX_0, idx);
778    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
779              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
780    //printf("myRank = %d MPI_Isend OK, idx = %d, nRequest = %d\n", myRank, idx, nRequest);
781    ++nRequest;
782  }
783
784  MPI_Barrier(this->internalComm_);
785
786  //printf("myRank = %d, check 2 OK\n", myRank);
787
788  int clientRank;
789  MPI_Comm_rank(this->internalComm_,&clientRank);
790
791  //printf("5(%d): calling wait all for %lu requests\n", myRank, sendBuffSize+recvBuffSize);
792  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
793  //printf("            5(%d): calling wait all for %lu requests OK\n", myRank, sendBuffSize+recvBuffSize);
794  //printf("check 3 OK\n");
795
796  int nbRecvRank = 0, nbRecvElements = 0;
797  recvNbRank.clear();
798  recvNbElements.clear();
799  for (int idx = 0; idx < recvBuffSize; ++idx)
800  {
801    if (0 != recvBuff[2*idx])
802    {
803      recvNbRank.push_back(recvRank[idx]);
804      recvNbElements.push_back(recvBuff[2*idx+1]);
805    }
806  }
807  //printf("check 4 OK\n");
808}
809
810}
Note: See TracBrowser for help on using the repository browser.