source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 1638

Last change on this file since 1638 was 1638, checked in by yushan, 6 years ago

dev on ADA

File size: 34.4 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15template<typename T, typename H>
16CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
17  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
18{
19  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
20  this->computeMPICommLevel();
21  int nbLvl = this->getNbLevel();
22  sendRank_.resize(nbLvl);
23  recvRank_.resize(nbLvl);
24}
25
26/*!
27  Constructor with initial distribution information and the corresponding index
28  Each client (process) holds a piece of information as well as the attached index, the index
29will be redistributed (projected) into size_t space as long as the associated information.
30  \param [in] indexInfoMap initial index and information mapping
31  \param [in] clientIntraComm communicator of clients
32  \param [in] hierarLvl level of hierarchy
33*/
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
36                                                        const ep_lib::MPI_Comm& clientIntraComm)
37  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
38{
39  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
40  this->computeMPICommLevel();
41  int nbLvl = this->getNbLevel();
42  sendRank_.resize(nbLvl);
43  recvRank_.resize(nbLvl);
44  Index2VectorInfoTypeMap indexToVecInfoMap;
45  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
46  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
47  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
48  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
49}
50
51/*!
52  Constructor with initial distribution information and the corresponding index
53  Each client (process) holds a piece of information as well as the attached index, the index
54will be redistributed (projected) into size_t space as long as the associated information.
55  \param [in] indexInfoMap initial index and information mapping
56  \param [in] clientIntraComm communicator of clients
57  \param [in] hierarLvl level of hierarchy
58*/
59template<typename T, typename H>
60CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
61                                                        const ep_lib::MPI_Comm& clientIntraComm)
62  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
63{
64  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
65  this->computeMPICommLevel();
66  int nbLvl = this->getNbLevel();
67  sendRank_.resize(nbLvl);
68  recvRank_.resize(nbLvl);
69  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
70}
71
72template<typename T, typename H>
73CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
74{
75}
76
77/*!
78  Compute mapping between indices and information corresponding to these indices
79  \param [in] indices indices a proc has
80*/
81template<typename T, typename H>
82void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
83{
84  int nbLvl = this->getNbLevel();
85  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
86}
87
88/*!
89    Compute mapping between indices and information corresponding to these indices
90for each level of hierarchical DHT. Recursive function
91   \param [in] indices indices a proc has
92   \param [in] commLevel communicator of current level
93   \param [in] level current level
94*/
95template<typename T, typename H>
96void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
97                                                                 const ep_lib::MPI_Comm& commLevel,
98                                                                 int level)
99{
100  int clientRank;
101  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
102  int groupRankBegin = this->getGroupBegin()[level];
103  int nbClient = this->getNbInGroup()[level];
104  std::vector<size_t> hashedIndex;
105  computeHashIndex(hashedIndex, nbClient);
106
107  size_t ssize = indices.numElements(), hashedVal;
108
109  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
110                                      iteClientHash = hashedIndex.end();
111  std::vector<int> sendBuff(nbClient,0);
112  std::vector<int> sendNbIndexBuff(nbClient,0);
113
114  // Number of global index whose mapping server are on other clients
115  int nbIndexToSend = 0;
116  size_t index;
117  HashXIOS<size_t> hashGlobalIndex;
118  std::unordered_map<size_t,int> nbIndices;
119  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
120  for (int i = 0; i < ssize; ++i)
121  {
122    index = indices(i);
123    if (0 == nbIndices.count(index))
124    {
125      hashedVal  = hashGlobalIndex(index);
126      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
127      int indexClient = std::distance(itbClientHash, itClientHash)-1;
128      ++sendNbIndexBuff[indexClient];
129      nbIndices[index] = 1;
130    }
131  }
132
133  std::unordered_map<int, size_t* > client2ClientIndex;
134  for (int idx = 0; idx < nbClient; ++idx)
135  {
136    if (0 != sendNbIndexBuff[idx])
137    {
138      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
139      nbIndexToSend += sendNbIndexBuff[idx];
140      sendBuff[idx] = 1;
141      sendNbIndexBuff[idx] = 0;
142    }
143  }
144
145  for (int i = 0; i < ssize; ++i)
146  {
147    index = indices(i);
148    if (1 == nbIndices[index])
149    {
150      hashedVal  = hashGlobalIndex(index);
151      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
152      int indexClient = std::distance(itbClientHash, itClientHash)-1;
153      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
154      ++sendNbIndexBuff[indexClient];
155      ++nbIndices[index];
156    }
157  }
158
159  std::vector<int> recvRankClient, recvNbIndexClientCount;
160  sendRecvRank(level, sendBuff, sendNbIndexBuff,
161               recvRankClient, recvNbIndexClientCount);
162
163  int recvNbIndexCount = 0;
164  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
165    recvNbIndexCount += recvNbIndexClientCount[idx];
166
167  unsigned long* recvIndexBuff;
168  if (0 != recvNbIndexCount)
169    recvIndexBuff = new unsigned long[recvNbIndexCount];
170
171  int request_size = 0;
172  for (int idx = 0; idx < recvRankClient.size(); ++idx)
173  {
174    if (0 != recvNbIndexClientCount[idx])
175      request_size ++;
176  }
177
178  request_size += client2ClientIndex.size();
179
180  std::vector<ep_lib::MPI_Request> request(request_size);
181  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
182                             iteRecvIndex = recvRankClient.end(),
183                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
184                           itRecvNbIndex;
185  int currentIndex = 0;
186  int nbRecvClient = recvRankClient.size();
187  int request_position = 0;
188  for (int idx = 0; idx < nbRecvClient; ++idx)
189  {
190    if (0 != recvNbIndexClientCount[idx])
191      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
192    currentIndex += recvNbIndexClientCount[idx];
193  }
194
195  std::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
196                                                iteIndex = client2ClientIndex.end();
197  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
198    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
199
200  std::vector<ep_lib::MPI_Status> status(request.size());
201  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
202
203  CArray<size_t,1>* tmpGlobalIndex;
204  if (0 != recvNbIndexCount)
205    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
206  else
207    tmpGlobalIndex = new CArray<size_t,1>();
208
209  // OK, we go to the next level and do something recursive
210  if (0 < level)
211  {
212    --level;
213    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
214  }
215  else // Now, we are in the last level where necessary mappings are.
216    indexToInfoMappingLevel_= (index2InfoMapping_);
217
218  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
219  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
220  currentIndex = 0;
221  for (int idx = 0; idx < nbRecvClient; ++idx)
222  {
223    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
224    {
225      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
226      if (iteIndexToInfoMap != itIndexToInfoMap)
227        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
228    }
229    currentIndex += recvNbIndexClientCount[idx];
230  }
231
232  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
233  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
234  int indexIndex = 0;
235  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
236  {
237    recvRankOnReturn[indexIndex] = itIndex->first;
238  }
239  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
240                   recvRankOnReturn, recvNbIndexOnReturn);
241
242  int recvNbIndexCountOnReturn = 0;
243  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
244    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
245
246  unsigned long* recvIndexBuffOnReturn;
247  unsigned char* recvInfoBuffOnReturn;
248  if (0 != recvNbIndexCountOnReturn)
249  {
250    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
251    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
252  }
253
254  int requestOnReturn_size=0;
255  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
256  {
257    if (0 != recvNbIndexOnReturn[idx])
258    {
259      requestOnReturn_size += 2;
260    }
261  }
262
263  for (int idx = 0; idx < nbRecvClient; ++idx)
264  {
265    if (0 != sendNbIndexOnReturn[idx])
266    {
267      requestOnReturn_size += 2;
268    }
269  }
270
271  int requestOnReturn_position=0;
272
273  std::vector<ep_lib::MPI_Request> requestOnReturn(requestOnReturn_size);
274  currentIndex = 0;
275  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
276  {
277    if (0 != recvNbIndexOnReturn[idx])
278    {
279      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
280      recvInfoFromClients(recvRankOnReturn[idx],
281                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
282                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
283                          commLevel, &requestOnReturn[requestOnReturn_position++]);
284    }
285    currentIndex += recvNbIndexOnReturn[idx];
286  }
287
288  std::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
289  std::unordered_map<int,size_t*> client2ClientIndexOnReturn;
290  currentIndex = 0;
291  for (int idx = 0; idx < nbRecvClient; ++idx)
292  {
293    if (0 != sendNbIndexOnReturn[idx])
294    {
295      int rank = recvRankClient[idx];
296      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
297      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
298      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
299      int infoIndex = 0;
300      int nb = 0;
301      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
302      {
303        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
304        if (iteIndexToInfoMap != itIndexToInfoMap)
305        {
306          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
307          for (int k = 0; k < infoTmp.size(); ++k)
308          {
309            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
310            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
311            ++nb;
312          }
313        }
314      }
315
316      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
317                         sendNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
318      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
319                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, &requestOnReturn[requestOnReturn_position++]);
320    }
321    currentIndex += recvNbIndexClientCount[idx];
322  }
323
324  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
325  ep_lib::MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
326
327  Index2VectorInfoTypeMap indexToInfoMapping;
328  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
329  int infoIndex = 0;
330  InfoType unpackedInfo;
331  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
332  {
333    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
334    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
335  }
336
337  indexToInfoMappingLevel_.swap(indexToInfoMapping);
338  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
339  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
340                                                        it != client2ClientIndex.end(); ++it)
341      delete [] it->second;
342  delete tmpGlobalIndex;
343
344  if (0 != recvNbIndexCountOnReturn)
345  {
346    delete [] recvIndexBuffOnReturn;
347    delete [] recvInfoBuffOnReturn;
348  }
349
350  for (std::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
351                                                               it != client2ClientInfoOnReturn.end(); ++it)
352      delete [] it->second;
353
354  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
355                                            it != client2ClientIndexOnReturn.end(); ++it)
356      delete [] it->second;
357}
358
359/*!
360  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
361*/
362template<typename T, typename H>
363void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
364{
365  // Compute range of hash index for each client
366  hashedIndex.resize(nbClient+1);
367  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
368  size_t nbHashIndex;
369  hashedIndex[0] = 0;
370  for (int i = 1; i < nbClient; ++i)
371  {
372    nbHashIndex = nbHashIndexMax / nbClient;
373    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
374    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
375  }
376  hashedIndex[nbClient] = nbHashIndexMax;
377}
378
379/*!
380  Compute distribution of global index for servers
381  Each client already holds a piece of information and its associated index.
382This information will be redistributed among processes by projecting indices into size_t space,
383the corresponding information will be also distributed on size_t space.
384After the redistribution, each client holds rearranged index and its corresponding information.
385  \param [in] indexInfoMap index and its corresponding info (usually server index)
386  \param [in] commLevel communicator of current level
387  \param [in] level current level
388*/
389template<typename T, typename H>
390void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
391                                                            const ep_lib::MPI_Comm& commLevel,
392                                                            int level)
393{
394  int clientRank;
395  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
396  computeSendRecvRank(level, clientRank);
397
398  int groupRankBegin = this->getGroupBegin()[level];
399  int nbClient = this->getNbInGroup()[level];
400  std::vector<size_t> hashedIndex;
401  computeHashIndex(hashedIndex, nbClient);
402
403  std::vector<int> sendBuff(nbClient,0);
404  std::vector<int> sendNbIndexBuff(nbClient,0);
405  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
406                                      iteClientHash = hashedIndex.end();
407  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
408                                                   ite = indexInfoMap.end();
409  HashXIOS<size_t> hashGlobalIndex;
410
411  // Compute size of sending and receving buffer
412  for (it = itb; it != ite; ++it)
413  {
414    size_t hashIndex = hashGlobalIndex(it->first);
415    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
416    int indexClient = std::distance(itbClientHash, itClientHash)-1;
417    sendNbIndexBuff[indexClient] += it->second.size();
418  }
419
420  std::unordered_map<int, size_t*> client2ClientIndex;
421  std::unordered_map<int, unsigned char*> client2ClientInfo;
422  for (int idx = 0; idx < nbClient; ++idx)
423  {
424    if (0 != sendNbIndexBuff[idx])
425    {
426      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
427      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
428      sendNbIndexBuff[idx] = 0;
429      sendBuff[idx] = 1;
430    }
431  }
432
433  std::vector<int> sendNbInfo(nbClient,0);
434  for (it = itb; it != ite; ++it)
435  {
436    const std::vector<InfoType>& infoTmp = it->second;
437    size_t hashIndex = hashGlobalIndex(it->first);
438    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
439    int indexClient = std::distance(itbClientHash, itClientHash)-1;
440    for (int idx = 0; idx < infoTmp.size(); ++idx)
441    {
442      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
443  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
444      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
445      ++sendNbIndexBuff[indexClient];
446    }
447  }
448
449  // Calculate from how many clients each client receive message.
450  // Calculate size of buffer for receiving message
451  std::vector<int> recvRankClient, recvNbIndexClientCount;
452  sendRecvRank(level, sendBuff, sendNbIndexBuff,
453               recvRankClient, recvNbIndexClientCount);
454
455  int recvNbIndexCount = 0;
456  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
457    recvNbIndexCount += recvNbIndexClientCount[idx];
458
459  unsigned long* recvIndexBuff;
460  unsigned char* recvInfoBuff;
461  if (0 != recvNbIndexCount)
462  {
463    recvIndexBuff = new unsigned long[recvNbIndexCount];
464    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
465  }
466
467  // If a client holds information about index and the corresponding which don't belong to it,
468  // it will send a message to the correct clients.
469  // Contents of the message are index and its corresponding informatioin
470  int request_size = 0;
471   for (int idx = 0; idx < recvRankClient.size(); ++idx)
472   {
473     if (0 != recvNbIndexClientCount[idx])
474     {
475       request_size += 2;
476     }
477   }
478 
479   request_size += client2ClientIndex.size();
480   request_size += client2ClientInfo.size();
481 
482   std::vector<ep_lib::MPI_Request> request(request_size);
483
484  int currentIndex = 0;
485  int nbRecvClient = recvRankClient.size();
486  int request_position=0;
487  for (int idx = 0; idx < nbRecvClient; ++idx)
488  {
489    if (0 != recvNbIndexClientCount[idx])
490    {
491      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
492      //if(clientRank==0) printf("recv index request = %p, inserted = %p, request = %d\n", &request[0], request.back(), *static_cast< int*>(request.back()->mpi_request));
493      recvInfoFromClients(recvRankClient[idx],
494                            recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
495                            recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
496                            commLevel, &request[request_position++]);
497      //if(clientRank==0) printf("recv info request = %p, inserted = %p, request = %d\n", &request[0], request.back(), *static_cast< int*>(request.back()->mpi_request));
498    }
499    currentIndex += recvNbIndexClientCount[idx];
500  }
501
502  std::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
503                                                iteIndex = client2ClientIndex.end();
504  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
505  {  sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
506  }    //if(clientRank==0) printf("send index request = %p, inserted = %p, request = %d\n", &request[0], request.back(), *static_cast< int*>(request.back()->mpi_request));}
507  std::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
508                                                      iteInfo = client2ClientInfo.end();
509  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
510   { sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, &request[request_position++]);
511   }//   if(clientRank==0) printf("send info request = %p, inserted = %p, request = %d\n", &request[0], request.back(), *static_cast< int*>(request.back()->mpi_request));}
512
513  std::vector<ep_lib::MPI_Status> status(request.size());
514 
515  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
516
517  Index2VectorInfoTypeMap indexToInfoMapping;
518  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
519  currentIndex = 0;
520  InfoType infoValue;
521  int infoIndex = 0;
522  unsigned char* infoBuff = recvInfoBuff;
523  for (int idx = 0; idx < nbRecvClient; ++idx)
524  {
525    size_t index;
526    int count = recvNbIndexClientCount[idx];
527    for (int i = 0; i < count; ++i)
528    {
529      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
530      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
531    }
532    currentIndex += count;
533  }
534
535  if (0 != recvNbIndexCount)
536  {
537    delete [] recvIndexBuff;
538    delete [] recvInfoBuff;
539  }
540  for (std::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
541                                                               it != client2ClientInfo.end(); ++it)
542      delete [] it->second;
543
544  for (std::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
545                                                        it != client2ClientIndex.end(); ++it)
546      delete [] it->second;
547
548  // Ok, now do something recursive
549  if (0 < level)
550  {
551    --level;
552    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
553  }
554  else
555    index2InfoMapping_.swap(indexToInfoMapping);
556}
557
558/*!
559  Send message containing index to clients
560  \param [in] clientDestRank rank of destination client
561  \param [in] indices index to send
562  \param [in] indiceSize size of index array to send
563  \param [in] clientIntraComm communication group of client
564  \param [in] requestSendIndex list of sending request
565*/
566template<typename T, typename H>
567void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
568                                                       const ep_lib::MPI_Comm& clientIntraComm,
569                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
570{
571  ep_lib::MPI_Request request;
572  requestSendIndex.push_back(request);
573  ep_lib::MPI_Isend(indices, indiceSize, EP_UNSIGNED_LONG,
574            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
575}
576
577/*!
578  Send message containing index to clients
579  \param [in] clientDestRank rank of destination client
580  \param [in] indices index to send
581  \param [in] indiceSize size of index array to send
582  \param [in] clientIntraComm communication group of client
583  \param [in] requestSendIndex sending request
584*/
585template<typename T, typename H>
586void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
587                                                       const ep_lib::MPI_Comm& clientIntraComm,
588                                                       ep_lib::MPI_Request* requestSendIndex)
589{
590  ep_lib::MPI_Isend(indices, indiceSize, EP_UNSIGNED_LONG,
591            clientDestRank, MPI_DHT_INDEX, clientIntraComm, requestSendIndex);
592}
593
594/*!
595  Receive message containing index to clients
596  \param [in] clientDestRank rank of destination client
597  \param [in] indices index to send
598  \param [in] clientIntraComm communication group of client
599  \param [in] requestRecvIndex list of receiving request
600*/
601template<typename T, typename H>
602void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
603                                                         const ep_lib::MPI_Comm& clientIntraComm,
604                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
605{
606  ep_lib::MPI_Request request;
607  requestRecvIndex.push_back(request);
608  ep_lib::MPI_Irecv(indices, indiceSize, EP_UNSIGNED_LONG,
609            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
610}
611
612/*!
613  Receive message containing index to clients
614  \param [in] clientDestRank rank of destination client
615  \param [in] indices index to send
616  \param [in] clientIntraComm communication group of client
617  \param [in] requestRecvIndex receiving request
618*/
619template<typename T, typename H>
620void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
621                                                         const ep_lib::MPI_Comm& clientIntraComm,
622                                                         ep_lib::MPI_Request *requestRecvIndex)
623{
624  ep_lib::MPI_Irecv(indices, indiceSize, EP_UNSIGNED_LONG,
625            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, requestRecvIndex);
626}
627
628/*!
629  Send message containing information to clients
630  \param [in] clientDestRank rank of destination client
631  \param [in] info info array to send
632  \param [in] infoSize info array size to send
633  \param [in] clientIntraComm communication group of client
634  \param [in] requestSendInfo list of sending request
635*/
636template<typename T, typename H>
637void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
638                                                      const ep_lib::MPI_Comm& clientIntraComm,
639                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
640{
641  ep_lib::MPI_Request request;
642  requestSendInfo.push_back(request);
643
644  ep_lib::MPI_Isend(info, infoSize, EP_CHAR,
645            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
646}
647
648/*!
649  Send message containing information to clients
650  \param [in] clientDestRank rank of destination client
651  \param [in] info info array to send
652  \param [in] infoSize info array size to send
653  \param [in] clientIntraComm communication group of client
654  \param [in] requestSendInfo sending request
655*/
656template<typename T, typename H>
657void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
658                                                      const ep_lib::MPI_Comm& clientIntraComm,
659                                                      ep_lib::MPI_Request *requestSendInfo)
660{
661  ep_lib::MPI_Isend(info, infoSize, EP_CHAR,
662            clientDestRank, MPI_DHT_INFO, clientIntraComm, requestSendInfo);
663}
664
665/*!
666  Receive message containing information from other clients
667  \param [in] clientDestRank rank of destination client
668  \param [in] info info array to receive
669  \param [in] infoSize info array size to receive
670  \param [in] clientIntraComm communication group of client
671  \param [in] requestRecvInfo list of sending request
672*/
673template<typename T, typename H>
674void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
675                                                        const ep_lib::MPI_Comm& clientIntraComm,
676                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
677{
678  ep_lib::MPI_Request request;
679  requestRecvInfo.push_back(request);
680
681  ep_lib::MPI_Irecv(info, infoSize, EP_CHAR,
682            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
683}
684
685/*!
686  Receive message containing information from other clients
687  \param [in] clientDestRank rank of destination client
688  \param [in] info info array to receive
689  \param [in] infoSize info array size to receive
690  \param [in] clientIntraComm communication group of client
691  \param [in] requestRecvInfo list of receiving request
692*/
693template<typename T, typename H>
694void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
695                                                        const ep_lib::MPI_Comm& clientIntraComm,
696                                                        ep_lib::MPI_Request* requestRecvInfo)
697{
698  ep_lib::MPI_Irecv(info, infoSize, EP_CHAR,
699            clientSrcRank, MPI_DHT_INFO, clientIntraComm, requestRecvInfo);
700}
701
702/*!
703  Compute how many processes one process needs to send to and from how many processes it will receive.
704  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
705*/
706template<typename T, typename H>
707void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
708{
709  int groupBegin = this->getGroupBegin()[level];
710  int nbInGroup  = this->getNbInGroup()[level];
711  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
712  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
713
714  std::vector<size_t> hashedIndexGroup;
715  computeHashIndex(hashedIndexGroup, nbInGroup);
716  size_t a = hashedIndexGroup[rank-groupBegin];
717  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
718
719  int currentGroup, offset;
720  size_t e,f;
721
722  // Do a simple math [a,b) intersect [c,d)
723  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
724  {
725    std::vector<size_t> hashedIndexGroupParent;
726    int nbInGroupParent = nbInGroupParents[idx];
727    if (0 != nbInGroupParent)
728      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
729    for (int i = 0; i < nbInGroupParent; ++i)
730    {
731      size_t c = hashedIndexGroupParent[i];
732      size_t d = hashedIndexGroupParent[i+1]-1;
733
734    if (!((d < a) || (b <c)))
735        recvRank_[level].push_back(groupParentBegin[idx]+i);
736    }
737
738    offset = rank - groupParentBegin[idx];
739    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
740    {
741      e = hashedIndexGroupParent[offset];
742      f = hashedIndexGroupParent[offset+1]-1;
743    }
744  }
745
746  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
747                                      iteHashGroup = hashedIndexGroup.end();
748  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
749  int begin = std::distance(itbHashGroup, itHashGroup)-1;
750  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
751  int end = std::distance(itbHashGroup, itHashGroup) -1;
752  sendRank_[level].resize(end-begin+1);
753  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
754}
755
756/*!
757  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
758  \param [in] sendNbRank Rank of clients to send to
759  \param [in] sendNbElements Number of elements each client to send to
760  \param [in] receiveNbRank Rank of clients to receive from
761  \param [out] recvNbElements Number of elements each client to send to
762*/
763template<typename T, typename H>
764void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
765                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
766{
767  recvNbElements.resize(recvNbRank.size());
768  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
769  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
770
771  int nRequest = 0;
772  for (int idx = 0; idx < recvNbRank.size(); ++idx)
773  {
774    ep_lib::MPI_Irecv(&recvNbElements[0]+idx, 1, EP_INT,
775              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
776    ++nRequest;
777  }
778
779  for (int idx = 0; idx < sendNbRank.size(); ++idx)
780  {
781    ep_lib::MPI_Isend(&sendNbElements[0]+idx, 1, EP_INT,
782              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
783    ++nRequest;
784  }
785
786  ep_lib::MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
787}
788
789/*!
790  Send and receive number of process each process need to listen to as well as number
791  of index it will receive during the initalization phase
792  \param [in] level current level
793  \param [in] sendNbRank Rank of clients to send to
794  \param [in] sendNbElements Number of elements each client to send to
795  \param [out] receiveNbRank Rank of clients to receive from
796  \param [out] recvNbElements Number of elements each client to send to
797*/
798template<typename T, typename H>
799void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
800                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
801                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
802{
803  int groupBegin = this->getGroupBegin()[level];
804
805  int offSet = 0;
806  std::vector<int>& sendRank = sendRank_[level];
807  std::vector<int>& recvRank = recvRank_[level];
808  int sendBuffSize = sendRank.size();
809  std::vector<int> sendBuff(sendBuffSize*2);
810  int recvBuffSize = recvRank.size();
811  std::vector<int> recvBuff(recvBuffSize*2,0);
812
813  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
814  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
815
816  int nRequest = 0;
817  for (int idx = 0; idx < recvBuffSize; ++idx)
818  {
819    ep_lib::MPI_Irecv(&recvBuff[0]+2*idx, 2, EP_INT,
820              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
821    ++nRequest;
822  }
823
824  for (int idx = 0; idx < sendBuffSize; ++idx)
825  {
826    offSet = sendRank[idx]-groupBegin;
827    sendBuff[idx*2] = sendNbRank[offSet];
828    sendBuff[idx*2+1] = sendNbElements[offSet];
829  }
830
831  for (int idx = 0; idx < sendBuffSize; ++idx)
832  {
833    ep_lib::MPI_Isend(&sendBuff[idx*2], 2, EP_INT,
834              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
835    ++nRequest;
836  }
837
838  ep_lib::MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
839  int nbRecvRank = 0, nbRecvElements = 0;
840  recvNbRank.clear();
841  recvNbElements.clear();
842  for (int idx = 0; idx < recvBuffSize; ++idx)
843  {
844    if (0 != recvBuff[2*idx])
845    {
846      recvNbRank.push_back(recvRank[idx]);
847      recvNbElements.push_back(recvBuff[2*idx+1]);
848    }
849  }
850}
851
852}
Note: See TracBrowser for help on using the repository browser.