source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 1639

Last change on this file since 1639 was 1639, checked in by yushan, 6 years ago

revert erroneous commit on trunk

File size: 29.3 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15template<typename T, typename H>
16CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const MPI_Comm& clientIntraComm)
17  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
18{
19  MPI_Comm_size(clientIntraComm, &nbClient_);
20  this->computeMPICommLevel();
21  int nbLvl = this->getNbLevel();
22  sendRank_.resize(nbLvl);
23  recvRank_.resize(nbLvl);
24}
25
26/*!
27  Constructor with initial distribution information and the corresponding index
28  Each client (process) holds a piece of information as well as the attached index, the index
29will be redistributed (projected) into size_t space as long as the associated information.
30  \param [in] indexInfoMap initial index and information mapping
31  \param [in] clientIntraComm communicator of clients
32  \param [in] hierarLvl level of hierarchy
33*/
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
36                                                        const MPI_Comm& clientIntraComm)
37  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
38{
39  MPI_Comm_size(clientIntraComm, &nbClient_);
40  this->computeMPICommLevel();
41  int nbLvl = this->getNbLevel();
42  sendRank_.resize(nbLvl);
43  recvRank_.resize(nbLvl);
44  Index2VectorInfoTypeMap indexToVecInfoMap;
45  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
46  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
47  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
48  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
49}
50
51/*!
52  Constructor with initial distribution information and the corresponding index
53  Each client (process) holds a piece of information as well as the attached index, the index
54will be redistributed (projected) into size_t space as long as the associated information.
55  \param [in] indexInfoMap initial index and information mapping
56  \param [in] clientIntraComm communicator of clients
57  \param [in] hierarLvl level of hierarchy
58*/
59template<typename T, typename H>
60CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
61                                                        const MPI_Comm& clientIntraComm)
62  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
63{
64  MPI_Comm_size(clientIntraComm, &nbClient_);
65  this->computeMPICommLevel();
66  int nbLvl = this->getNbLevel();
67  sendRank_.resize(nbLvl);
68  recvRank_.resize(nbLvl);
69  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
70}
71
72template<typename T, typename H>
73CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
74{
75}
76
77/*!
78  Compute mapping between indices and information corresponding to these indices
79  \param [in] indices indices a proc has
80*/
81template<typename T, typename H>
82void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
83{
84  int nbLvl = this->getNbLevel();
85  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
86}
87
88/*!
89    Compute mapping between indices and information corresponding to these indices
90for each level of hierarchical DHT. Recursive function
91   \param [in] indices indices a proc has
92   \param [in] commLevel communicator of current level
93   \param [in] level current level
94*/
95template<typename T, typename H>
96void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
97                                                                 const MPI_Comm& commLevel,
98                                                                 int level)
99{
100  int clientRank;
101  MPI_Comm_rank(commLevel,&clientRank);
102  int groupRankBegin = this->getGroupBegin()[level];
103  int nbClient = this->getNbInGroup()[level];
104  std::vector<size_t> hashedIndex;
105  computeHashIndex(hashedIndex, nbClient);
106
107  size_t ssize = indices.numElements(), hashedVal;
108
109  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
110                                      iteClientHash = hashedIndex.end();
111  std::vector<int> sendBuff(nbClient,0);
112  std::vector<int> sendNbIndexBuff(nbClient,0);
113
114  // Number of global index whose mapping server are on other clients
115  int nbIndexToSend = 0;
116  size_t index;
117  HashXIOS<size_t> hashGlobalIndex;
118  std::unordered_map<size_t,int> nbIndices;
119  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
120  for (int i = 0; i < ssize; ++i)
121  {
122    index = indices(i);
123    if (0 == nbIndices.count(index))
124    {
125      hashedVal  = hashGlobalIndex(index);
126      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
127      int indexClient = std::distance(itbClientHash, itClientHash)-1;
128      ++sendNbIndexBuff[indexClient];
129      nbIndices[index] = 1;
130    }
131  }
132
133  std::unordered_map<int, size_t* > client2ClientIndex;
134  for (int idx = 0; idx < nbClient; ++idx)
135  {
136    if (0 != sendNbIndexBuff[idx])
137    {
138      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
139      nbIndexToSend += sendNbIndexBuff[idx];
140      sendBuff[idx] = 1;
141      sendNbIndexBuff[idx] = 0;
142    }
143  }
144
145  for (int i = 0; i < ssize; ++i)
146  {
147    index = indices(i);
148    if (1 == nbIndices[index])
149    {
150      hashedVal  = hashGlobalIndex(index);
151      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
152      int indexClient = std::distance(itbClientHash, itClientHash)-1;
153      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
154      ++sendNbIndexBuff[indexClient];
155      ++nbIndices[index];
156    }
157  }
158
159  std::vector<int> recvRankClient, recvNbIndexClientCount;
160  sendRecvRank(level, sendBuff, sendNbIndexBuff,
161               recvRankClient, recvNbIndexClientCount);
162
163  int recvNbIndexCount = 0;
164  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
165    recvNbIndexCount += recvNbIndexClientCount[idx];
166
167  unsigned long* recvIndexBuff;
168  if (0 != recvNbIndexCount)
169    recvIndexBuff = new unsigned long[recvNbIndexCount];
170
171  std::vector<MPI_Request> request;
172  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
173                             iteRecvIndex = recvRankClient.end(),
174                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
175                           itRecvNbIndex;
176  int currentIndex = 0;
177  int nbRecvClient = recvRankClient.size();
178  for (int idx = 0; idx < nbRecvClient; ++idx)
179  {
180    if (0 != recvNbIndexClientCount[idx])
181      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
182    currentIndex += recvNbIndexClientCount[idx];
183  }
184
185  std::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
186                                                iteIndex = client2ClientIndex.end();
187  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
188    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
189
190  std::vector<MPI_Status> status(request.size());
191  MPI_Waitall(request.size(), &request[0], &status[0]);
192
193  CArray<size_t,1>* tmpGlobalIndex;
194  if (0 != recvNbIndexCount)
195    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
196  else
197    tmpGlobalIndex = new CArray<size_t,1>();
198
199  // OK, we go to the next level and do something recursive
200  if (0 < level)
201  {
202    --level;
203    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
204  }
205  else // Now, we are in the last level where necessary mappings are.
206    indexToInfoMappingLevel_= (index2InfoMapping_);
207
208  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
209  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
210  currentIndex = 0;
211  for (int idx = 0; idx < nbRecvClient; ++idx)
212  {
213    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
214    {
215      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
216      if (iteIndexToInfoMap != itIndexToInfoMap)
217        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
218    }
219    currentIndex += recvNbIndexClientCount[idx];
220  }
221
222  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
223  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
224  int indexIndex = 0;
225  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
226  {
227    recvRankOnReturn[indexIndex] = itIndex->first;
228  }
229  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
230                   recvRankOnReturn, recvNbIndexOnReturn);
231
232  int recvNbIndexCountOnReturn = 0;
233  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
234    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
235
236  unsigned long* recvIndexBuffOnReturn;
237  unsigned char* recvInfoBuffOnReturn;
238  if (0 != recvNbIndexCountOnReturn)
239  {
240    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
241    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
242  }
243
244  std::vector<MPI_Request> requestOnReturn;
245  currentIndex = 0;
246  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
247  {
248    if (0 != recvNbIndexOnReturn[idx])
249    {
250      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
251      recvInfoFromClients(recvRankOnReturn[idx],
252                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
253                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
254                          commLevel, requestOnReturn);
255    }
256    currentIndex += recvNbIndexOnReturn[idx];
257  }
258
259  std::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
260  std::unordered_map<int,size_t*> client2ClientIndexOnReturn;
261  currentIndex = 0;
262  for (int idx = 0; idx < nbRecvClient; ++idx)
263  {
264    if (0 != sendNbIndexOnReturn[idx])
265    {
266      int rank = recvRankClient[idx];
267      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
268      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
269      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
270      int infoIndex = 0;
271      int nb = 0;
272      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
273      {
274        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
275        if (iteIndexToInfoMap != itIndexToInfoMap)
276        {
277          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
278          for (int k = 0; k < infoTmp.size(); ++k)
279          {
280            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
281            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
282            ++nb;
283          }
284        }
285      }
286
287      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
288                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
289      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
290                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
291    }
292    currentIndex += recvNbIndexClientCount[idx];
293  }
294
295  std::vector<MPI_Status> statusOnReturn(requestOnReturn.size());
296  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
297
298  Index2VectorInfoTypeMap indexToInfoMapping;
299  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
300  int infoIndex = 0;
301  InfoType unpackedInfo;
302  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
303  {
304    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
305    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
306  }
307
308  indexToInfoMappingLevel_.swap(indexToInfoMapping);
309  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
310  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
311                                                        it != client2ClientIndex.end(); ++it)
312      delete [] it->second;
313  delete tmpGlobalIndex;
314
315  if (0 != recvNbIndexCountOnReturn)
316  {
317    delete [] recvIndexBuffOnReturn;
318    delete [] recvInfoBuffOnReturn;
319  }
320
321  for (std::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
322                                                               it != client2ClientInfoOnReturn.end(); ++it)
323      delete [] it->second;
324
325  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
326                                            it != client2ClientIndexOnReturn.end(); ++it)
327      delete [] it->second;
328}
329
330/*!
331  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
332*/
333template<typename T, typename H>
334void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
335{
336  // Compute range of hash index for each client
337  hashedIndex.resize(nbClient+1);
338  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
339  size_t nbHashIndex;
340  hashedIndex[0] = 0;
341  for (int i = 1; i < nbClient; ++i)
342  {
343    nbHashIndex = nbHashIndexMax / nbClient;
344    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
345    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
346  }
347  hashedIndex[nbClient] = nbHashIndexMax;
348}
349
350/*!
351  Compute distribution of global index for servers
352  Each client already holds a piece of information and its associated index.
353This information will be redistributed among processes by projecting indices into size_t space,
354the corresponding information will be also distributed on size_t space.
355After the redistribution, each client holds rearranged index and its corresponding information.
356  \param [in] indexInfoMap index and its corresponding info (usually server index)
357  \param [in] commLevel communicator of current level
358  \param [in] level current level
359*/
360template<typename T, typename H>
361void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
362                                                            const MPI_Comm& commLevel,
363                                                            int level)
364{
365  int clientRank;
366  MPI_Comm_rank(commLevel,&clientRank);
367  computeSendRecvRank(level, clientRank);
368
369  int groupRankBegin = this->getGroupBegin()[level];
370  int nbClient = this->getNbInGroup()[level];
371  std::vector<size_t> hashedIndex;
372  computeHashIndex(hashedIndex, nbClient);
373
374  std::vector<int> sendBuff(nbClient,0);
375  std::vector<int> sendNbIndexBuff(nbClient,0);
376  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
377                                      iteClientHash = hashedIndex.end();
378  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
379                                                   ite = indexInfoMap.end();
380  HashXIOS<size_t> hashGlobalIndex;
381
382  // Compute size of sending and receving buffer
383  for (it = itb; it != ite; ++it)
384  {
385    size_t hashIndex = hashGlobalIndex(it->first);
386    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
387    int indexClient = std::distance(itbClientHash, itClientHash)-1;
388    sendNbIndexBuff[indexClient] += it->second.size();
389  }
390
391  std::unordered_map<int, size_t*> client2ClientIndex;
392  std::unordered_map<int, unsigned char*> client2ClientInfo;
393  for (int idx = 0; idx < nbClient; ++idx)
394  {
395    if (0 != sendNbIndexBuff[idx])
396    {
397      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
398      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
399      sendNbIndexBuff[idx] = 0;
400      sendBuff[idx] = 1;
401    }
402  }
403
404  std::vector<int> sendNbInfo(nbClient,0);
405  for (it = itb; it != ite; ++it)
406  {
407    const std::vector<InfoType>& infoTmp = it->second;
408    size_t hashIndex = hashGlobalIndex(it->first);
409    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
410    int indexClient = std::distance(itbClientHash, itClientHash)-1;
411    for (int idx = 0; idx < infoTmp.size(); ++idx)
412    {
413      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
414  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
415      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
416      ++sendNbIndexBuff[indexClient];
417    }
418  }
419
420  // Calculate from how many clients each client receive message.
421  // Calculate size of buffer for receiving message
422  std::vector<int> recvRankClient, recvNbIndexClientCount;
423  sendRecvRank(level, sendBuff, sendNbIndexBuff,
424               recvRankClient, recvNbIndexClientCount);
425
426  int recvNbIndexCount = 0;
427  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
428    recvNbIndexCount += recvNbIndexClientCount[idx];
429
430  unsigned long* recvIndexBuff;
431  unsigned char* recvInfoBuff;
432  if (0 != recvNbIndexCount)
433  {
434    recvIndexBuff = new unsigned long[recvNbIndexCount];
435    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
436  }
437
438  // If a client holds information about index and the corresponding which don't belong to it,
439  // it will send a message to the correct clients.
440  // Contents of the message are index and its corresponding informatioin
441  std::vector<MPI_Request> request;
442  int currentIndex = 0;
443  int nbRecvClient = recvRankClient.size();
444  for (int idx = 0; idx < nbRecvClient; ++idx)
445  {
446    if (0 != recvNbIndexClientCount[idx])
447    {
448      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
449      recvInfoFromClients(recvRankClient[idx],
450                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
451                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
452                          commLevel, request);
453    }
454    currentIndex += recvNbIndexClientCount[idx];
455  }
456
457  std::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
458                                                iteIndex = client2ClientIndex.end();
459  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
460    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
461  std::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
462                                                      iteInfo = client2ClientInfo.end();
463  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
464    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
465
466  std::vector<MPI_Status> status(request.size());
467  MPI_Waitall(request.size(), &request[0], &status[0]);
468
469  Index2VectorInfoTypeMap indexToInfoMapping;
470  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
471  currentIndex = 0;
472  InfoType infoValue;
473  int infoIndex = 0;
474  unsigned char* infoBuff = recvInfoBuff;
475  for (int idx = 0; idx < nbRecvClient; ++idx)
476  {
477    size_t index;
478    int count = recvNbIndexClientCount[idx];
479    for (int i = 0; i < count; ++i)
480    {
481      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
482      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
483    }
484    currentIndex += count;
485  }
486
487  if (0 != recvNbIndexCount)
488  {
489    delete [] recvIndexBuff;
490    delete [] recvInfoBuff;
491  }
492  for (std::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
493                                                               it != client2ClientInfo.end(); ++it)
494      delete [] it->second;
495
496  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
497                                                        it != client2ClientIndex.end(); ++it)
498      delete [] it->second;
499
500  // Ok, now do something recursive
501  if (0 < level)
502  {
503    --level;
504    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
505  }
506  else
507    index2InfoMapping_.swap(indexToInfoMapping);
508}
509
510/*!
511  Send message containing index to clients
512  \param [in] clientDestRank rank of destination client
513  \param [in] indices index to send
514  \param [in] indiceSize size of index array to send
515  \param [in] clientIntraComm communication group of client
516  \param [in] requestSendIndex list of sending request
517*/
518template<typename T, typename H>
519void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
520                                                       const MPI_Comm& clientIntraComm,
521                                                       std::vector<MPI_Request>& requestSendIndex)
522{
523  MPI_Request request;
524  requestSendIndex.push_back(request);
525  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
526            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
527}
528
529/*!
530  Receive message containing index to clients
531  \param [in] clientDestRank rank of destination client
532  \param [in] indices index to send
533  \param [in] clientIntraComm communication group of client
534  \param [in] requestRecvIndex list of receiving request
535*/
536template<typename T, typename H>
537void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
538                                                         const MPI_Comm& clientIntraComm,
539                                                         std::vector<MPI_Request>& requestRecvIndex)
540{
541  MPI_Request request;
542  requestRecvIndex.push_back(request);
543  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
544            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
545}
546
547/*!
548  Send message containing information to clients
549  \param [in] clientDestRank rank of destination client
550  \param [in] info info array to send
551  \param [in] infoSize info array size to send
552  \param [in] clientIntraComm communication group of client
553  \param [in] requestSendInfo list of sending request
554*/
555template<typename T, typename H>
556void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
557                                                      const MPI_Comm& clientIntraComm,
558                                                      std::vector<MPI_Request>& requestSendInfo)
559{
560  MPI_Request request;
561  requestSendInfo.push_back(request);
562
563  MPI_Isend(info, infoSize, MPI_CHAR,
564            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
565}
566
567/*!
568  Receive message containing information from other clients
569  \param [in] clientDestRank rank of destination client
570  \param [in] info info array to receive
571  \param [in] infoSize info array size to receive
572  \param [in] clientIntraComm communication group of client
573  \param [in] requestRecvInfo list of sending request
574*/
575template<typename T, typename H>
576void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
577                                                        const MPI_Comm& clientIntraComm,
578                                                        std::vector<MPI_Request>& requestRecvInfo)
579{
580  MPI_Request request;
581  requestRecvInfo.push_back(request);
582
583  MPI_Irecv(info, infoSize, MPI_CHAR,
584            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
585}
586
587/*!
588  Compute how many processes one process needs to send to and from how many processes it will receive.
589  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
590*/
591template<typename T, typename H>
592void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
593{
594  int groupBegin = this->getGroupBegin()[level];
595  int nbInGroup  = this->getNbInGroup()[level];
596  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
597  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
598
599  std::vector<size_t> hashedIndexGroup;
600  computeHashIndex(hashedIndexGroup, nbInGroup);
601  size_t a = hashedIndexGroup[rank-groupBegin];
602  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
603
604  int currentGroup, offset;
605  size_t e,f;
606
607  // Do a simple math [a,b) intersect [c,d)
608  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
609  {
610    std::vector<size_t> hashedIndexGroupParent;
611    int nbInGroupParent = nbInGroupParents[idx];
612    if (0 != nbInGroupParent)
613      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
614    for (int i = 0; i < nbInGroupParent; ++i)
615    {
616      size_t c = hashedIndexGroupParent[i];
617      size_t d = hashedIndexGroupParent[i+1]-1;
618
619    if (!((d < a) || (b <c)))
620        recvRank_[level].push_back(groupParentBegin[idx]+i);
621    }
622
623    offset = rank - groupParentBegin[idx];
624    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
625    {
626      e = hashedIndexGroupParent[offset];
627      f = hashedIndexGroupParent[offset+1]-1;
628    }
629  }
630
631  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
632                                      iteHashGroup = hashedIndexGroup.end();
633  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
634  int begin = std::distance(itbHashGroup, itHashGroup)-1;
635  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
636  int end = std::distance(itbHashGroup, itHashGroup) -1;
637  sendRank_[level].resize(end-begin+1);
638  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
639}
640
641/*!
642  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
643  \param [in] sendNbRank Rank of clients to send to
644  \param [in] sendNbElements Number of elements each client to send to
645  \param [in] receiveNbRank Rank of clients to receive from
646  \param [out] recvNbElements Number of elements each client to send to
647*/
648template<typename T, typename H>
649void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
650                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
651{
652  recvNbElements.resize(recvNbRank.size());
653  std::vector<MPI_Request> request(sendNbRank.size()+recvNbRank.size());
654  std::vector<MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
655
656  int nRequest = 0;
657  for (int idx = 0; idx < recvNbRank.size(); ++idx)
658  {
659    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
660              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
661    ++nRequest;
662  }
663
664  for (int idx = 0; idx < sendNbRank.size(); ++idx)
665  {
666    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
667              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
668    ++nRequest;
669  }
670
671  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
672}
673
674/*!
675  Send and receive number of process each process need to listen to as well as number
676  of index it will receive during the initalization phase
677  \param [in] level current level
678  \param [in] sendNbRank Rank of clients to send to
679  \param [in] sendNbElements Number of elements each client to send to
680  \param [out] receiveNbRank Rank of clients to receive from
681  \param [out] recvNbElements Number of elements each client to send to
682*/
683template<typename T, typename H>
684void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
685                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
686                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
687{
688  int groupBegin = this->getGroupBegin()[level];
689
690  int offSet = 0;
691  std::vector<int>& sendRank = sendRank_[level];
692  std::vector<int>& recvRank = recvRank_[level];
693  int sendBuffSize = sendRank.size();
694  std::vector<int> sendBuff(sendBuffSize*2);
695  int recvBuffSize = recvRank.size();
696  std::vector<int> recvBuff(recvBuffSize*2,0);
697
698  std::vector<MPI_Request> request(sendBuffSize+recvBuffSize);
699  std::vector<MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
700
701  int nRequest = 0;
702  for (int idx = 0; idx < recvBuffSize; ++idx)
703  {
704    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
705              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
706    ++nRequest;
707  }
708
709  for (int idx = 0; idx < sendBuffSize; ++idx)
710  {
711    offSet = sendRank[idx]-groupBegin;
712    sendBuff[idx*2] = sendNbRank[offSet];
713    sendBuff[idx*2+1] = sendNbElements[offSet];
714  }
715
716  for (int idx = 0; idx < sendBuffSize; ++idx)
717  {
718    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
719              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
720    ++nRequest;
721  }
722
723  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
724  int nbRecvRank = 0, nbRecvElements = 0;
725  recvNbRank.clear();
726  recvNbElements.clear();
727  for (int idx = 0; idx < recvBuffSize; ++idx)
728  {
729    if (0 != recvBuff[2*idx])
730    {
731      recvNbRank.push_back(recvRank[idx]);
732      recvNbElements.push_back(recvBuff[2*idx+1]);
733    }
734  }
735}
736
737}
Note: See TracBrowser for help on using the repository browser.