source: XIOS/dev/branch_openmp/src/client_client_dht_template_impl.hpp @ 1328

Last change on this file since 1328 was 1328, checked in by yushan, 6 years ago

dev_omp

File size: 35.4 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15template<typename T, typename H>
16CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
17  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
18{
19  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
20  this->computeMPICommLevel();
21  int nbLvl = this->getNbLevel();
22  sendRank_.resize(nbLvl);
23  recvRank_.resize(nbLvl);
24}
25
26/*!
27  Constructor with initial distribution information and the corresponding index
28  Each client (process) holds a piece of information as well as the attached index, the index
29will be redistributed (projected) into size_t space as long as the associated information.
30  \param [in] indexInfoMap initial index and information mapping
31  \param [in] clientIntraComm communicator of clients
32  \param [in] hierarLvl level of hierarchy
33*/
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
36                                                        const ep_lib::MPI_Comm& clientIntraComm)
37  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
38{
39  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
40  this->computeMPICommLevel();
41  int nbLvl = this->getNbLevel();
42  sendRank_.resize(nbLvl);
43  recvRank_.resize(nbLvl);
44  Index2VectorInfoTypeMap indexToVecInfoMap;
45  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
46  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
47  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
48  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
49}
50
51/*!
52  Constructor with initial distribution information and the corresponding index
53  Each client (process) holds a piece of information as well as the attached index, the index
54will be redistributed (projected) into size_t space as long as the associated information.
55  \param [in] indexInfoMap initial index and information mapping
56  \param [in] clientIntraComm communicator of clients
57  \param [in] hierarLvl level of hierarchy
58*/
59template<typename T, typename H>
60CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
61                                                        const ep_lib::MPI_Comm& clientIntraComm)
62  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
63{
64  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
65  this->computeMPICommLevel();
66  int nbLvl = this->getNbLevel();
67  sendRank_.resize(nbLvl);
68  recvRank_.resize(nbLvl);
69  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
70}
71
72template<typename T, typename H>
73CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
74{
75}
76
77/*!
78  Compute mapping between indices and information corresponding to these indices
79  \param [in] indices indices a proc has
80*/
81template<typename T, typename H>
82void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
83{
84  int nbLvl = this->getNbLevel();
85  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
86}
87
88/*!
89    Compute mapping between indices and information corresponding to these indices
90for each level of hierarchical DHT. Recursive function
91   \param [in] indices indices a proc has
92   \param [in] commLevel communicator of current level
93   \param [in] level current level
94*/
95template<typename T, typename H>
96void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
97                                                                 const ep_lib::MPI_Comm& commLevel,
98                                                                 int level)
99{
100  int clientRank;
101  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
102  int groupRankBegin = this->getGroupBegin()[level];
103  int nbClient = this->getNbInGroup()[level];
104  std::vector<size_t> hashedIndex;
105  computeHashIndex(hashedIndex, nbClient);
106
107  size_t ssize = indices.numElements(), hashedVal;
108
109  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
110                                      iteClientHash = hashedIndex.end();
111  std::vector<int> sendBuff(nbClient,0);
112  std::vector<int> sendNbIndexBuff(nbClient,0);
113
114  // Number of global index whose mapping server are on other clients
115  int nbIndexToSend = 0;
116  size_t index;
117  HashXIOS<size_t> hashGlobalIndex;
118  boost::unordered_map<size_t,int> nbIndices;
119  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
120  for (int i = 0; i < ssize; ++i)
121  {
122    index = indices(i);
123    if (0 == nbIndices.count(index))
124    {
125      hashedVal  = hashGlobalIndex(index);
126      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
127      int indexClient = std::distance(itbClientHash, itClientHash)-1;
128      ++sendNbIndexBuff[indexClient];
129      nbIndices[index] = 1;
130    }
131  }
132
133  boost::unordered_map<int, size_t* > client2ClientIndex;
134  for (int idx = 0; idx < nbClient; ++idx)
135  {
136    if (0 != sendNbIndexBuff[idx])
137    {
138      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
139      nbIndexToSend += sendNbIndexBuff[idx];
140      sendBuff[idx] = 1;
141      sendNbIndexBuff[idx] = 0;
142    }
143  }
144
145  for (int i = 0; i < ssize; ++i)
146  {
147    index = indices(i);
148    if (1 == nbIndices[index])
149    {
150      hashedVal  = hashGlobalIndex(index);
151      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
152      int indexClient = std::distance(itbClientHash, itClientHash)-1;
153      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
154      ++sendNbIndexBuff[indexClient];
155      ++nbIndices[index];
156    }
157  }
158
159  std::vector<int> recvRankClient, recvNbIndexClientCount;
160  sendRecvRank(level, sendBuff, sendNbIndexBuff,
161               recvRankClient, recvNbIndexClientCount);
162
163  int recvNbIndexCount = 0;
164  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
165    recvNbIndexCount += recvNbIndexClientCount[idx];
166
167  unsigned long* recvIndexBuff;
168  if (0 != recvNbIndexCount)
169    recvIndexBuff = new unsigned long[recvNbIndexCount];
170
171int request_size = 0;
172  for (int idx = 0; idx < recvRankClient.size(); ++idx)
173  {
174    if (0 != recvNbIndexClientCount[idx])
175      request_size ++;
176  }
177
178  request_size += client2ClientIndex.size();
179
180  std::vector<ep_lib::MPI_Request> request(request_size);
181 
182  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
183                             iteRecvIndex = recvRankClient.end(),
184                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
185                           itRecvNbIndex;
186  int currentIndex = 0;
187  int nbRecvClient = recvRankClient.size();
188  int request_position = 0;
189  for (int idx = 0; idx < nbRecvClient; ++idx)
190  {
191    if (0 != recvNbIndexClientCount[idx])
192      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
193      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
194    currentIndex += recvNbIndexClientCount[idx];
195  }
196
197  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
198                                                iteIndex = client2ClientIndex.end();
199  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
200    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
201    //sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
202
203  std::vector<ep_lib::MPI_Status> status(request.size());
204  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
205
206  CArray<size_t,1>* tmpGlobalIndex;
207  if (0 != recvNbIndexCount)
208    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
209  else
210    tmpGlobalIndex = new CArray<size_t,1>();
211
212  // OK, we go to the next level and do something recursive
213  if (0 < level)
214  {
215    --level;
216    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
217  }
218  else // Now, we are in the last level where necessary mappings are.
219    indexToInfoMappingLevel_= (index2InfoMapping_);
220
221  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
222  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
223  currentIndex = 0;
224  for (int idx = 0; idx < nbRecvClient; ++idx)
225  {
226    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
227    {
228      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
229      if (iteIndexToInfoMap != itIndexToInfoMap)
230        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
231    }
232    currentIndex += recvNbIndexClientCount[idx];
233  }
234
235  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
236  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
237  int indexIndex = 0;
238  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
239  {
240    recvRankOnReturn[indexIndex] = itIndex->first;
241  }
242  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
243                   recvRankOnReturn, recvNbIndexOnReturn);
244
245  int recvNbIndexCountOnReturn = 0;
246  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
247    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
248
249  unsigned long* recvIndexBuffOnReturn;
250  unsigned char* recvInfoBuffOnReturn;
251  if (0 != recvNbIndexCountOnReturn)
252  {
253    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
254    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
255  }
256
257int requestOnReturn_size=0;
258  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
259  {
260    if (0 != recvNbIndexOnReturn[idx])
261    {
262      requestOnReturn_size += 2;
263    }
264  }
265
266  for (int idx = 0; idx < nbRecvClient; ++idx)
267  {
268    if (0 != sendNbIndexOnReturn[idx])
269    {
270      requestOnReturn_size += 2;
271    }
272  }
273
274  int requestOnReturn_position=0;
275
276  std::vector<ep_lib::MPI_Request> requestOnReturn(requestOnReturn_size);
277  currentIndex = 0;
278  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
279  {
280    if (0 != recvNbIndexOnReturn[idx])
281    {
282      //recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
283      //recvInfoFromClients(recvRankOnReturn[idx],
284      //                    recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
285      //                    recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
286      //                    commLevel, requestOnReturn);
287      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
288      recvInfoFromClients(recvRankOnReturn[idx],
289                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
290                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
291                          commLevel, &requestOnReturn[requestOnReturn_position++]);
292    }
293    currentIndex += recvNbIndexOnReturn[idx];
294  }
295
296  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
297  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
298  currentIndex = 0;
299  for (int idx = 0; idx < nbRecvClient; ++idx)
300  {
301    if (0 != sendNbIndexOnReturn[idx])
302    {
303      int rank = recvRankClient[idx];
304      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
305      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
306      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
307      int infoIndex = 0;
308      int nb = 0;
309      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
310      {
311        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
312        if (iteIndexToInfoMap != itIndexToInfoMap)
313        {
314          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
315          for (int k = 0; k < infoTmp.size(); ++k)
316          {
317            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
318            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
319            ++nb;
320          }
321        }
322      }
323
324      //sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
325      //                   sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
326      //sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
327      //                  sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
328      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
329                         sendNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
330      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
331                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, &requestOnReturn[requestOnReturn_position++]);
332     
333    }
334    currentIndex += recvNbIndexClientCount[idx];
335  }
336
337  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
338  ep_lib::MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
339
340  Index2VectorInfoTypeMap indexToInfoMapping;
341  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
342  int infoIndex = 0;
343  InfoType unpackedInfo;
344  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
345  {
346    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
347    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
348  }
349
350  indexToInfoMappingLevel_.swap(indexToInfoMapping);
351  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
352  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
353                                                        it != client2ClientIndex.end(); ++it)
354      delete [] it->second;
355  delete tmpGlobalIndex;
356
357  if (0 != recvNbIndexCountOnReturn)
358  {
359    delete [] recvIndexBuffOnReturn;
360    delete [] recvInfoBuffOnReturn;
361  }
362
363  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
364                                                               it != client2ClientInfoOnReturn.end(); ++it)
365      delete [] it->second;
366
367  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
368                                            it != client2ClientIndexOnReturn.end(); ++it)
369      delete [] it->second;
370}
371
372/*!
373  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
374*/
375template<typename T, typename H>
376void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
377{
378  // Compute range of hash index for each client
379  hashedIndex.resize(nbClient+1);
380  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
381  size_t nbHashIndex;
382  hashedIndex[0] = 0;
383  for (int i = 1; i < nbClient; ++i)
384  {
385    nbHashIndex = nbHashIndexMax / nbClient;
386    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
387    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
388  }
389  hashedIndex[nbClient] = nbHashIndexMax;
390}
391
392/*!
393  Compute distribution of global index for servers
394  Each client already holds a piece of information and its associated index.
395This information will be redistributed among processes by projecting indices into size_t space,
396the corresponding information will be also distributed on size_t space.
397After the redistribution, each client holds rearranged index and its corresponding information.
398  \param [in] indexInfoMap index and its corresponding info (usually server index)
399  \param [in] commLevel communicator of current level
400  \param [in] level current level
401*/
402template<typename T, typename H>
403void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
404                                                            const ep_lib::MPI_Comm& commLevel,
405                                                            int level)
406{
407  int clientRank;
408  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
409  computeSendRecvRank(level, clientRank);
410
411  int groupRankBegin = this->getGroupBegin()[level];
412  int nbClient = this->getNbInGroup()[level];
413  std::vector<size_t> hashedIndex;
414  computeHashIndex(hashedIndex, nbClient);
415
416  std::vector<int> sendBuff(nbClient,0);
417  std::vector<int> sendNbIndexBuff(nbClient,0);
418  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
419                                      iteClientHash = hashedIndex.end();
420  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
421                                                   ite = indexInfoMap.end();
422  HashXIOS<size_t> hashGlobalIndex;
423
424  // Compute size of sending and receving buffer
425  for (it = itb; it != ite; ++it)
426  {
427    size_t hashIndex = hashGlobalIndex(it->first);
428    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
429    int indexClient = std::distance(itbClientHash, itClientHash)-1;
430    sendNbIndexBuff[indexClient] += it->second.size();
431  }
432
433  boost::unordered_map<int, size_t*> client2ClientIndex;
434  boost::unordered_map<int, unsigned char*> client2ClientInfo;
435  for (int idx = 0; idx < nbClient; ++idx)
436  {
437    if (0 != sendNbIndexBuff[idx])
438    {
439      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
440      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
441      sendNbIndexBuff[idx] = 0;
442      sendBuff[idx] = 1;
443    }
444  }
445
446  std::vector<int> sendNbInfo(nbClient,0);
447  for (it = itb; it != ite; ++it)
448  {
449    const std::vector<InfoType>& infoTmp = it->second;
450    size_t hashIndex = hashGlobalIndex(it->first);
451    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
452    int indexClient = std::distance(itbClientHash, itClientHash)-1;
453    for (int idx = 0; idx < infoTmp.size(); ++idx)
454    {
455      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
456  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
457      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
458      ++sendNbIndexBuff[indexClient];
459    }
460  }
461
462  // Calculate from how many clients each client receive message.
463  // Calculate size of buffer for receiving message
464  std::vector<int> recvRankClient, recvNbIndexClientCount;
465  sendRecvRank(level, sendBuff, sendNbIndexBuff,
466               recvRankClient, recvNbIndexClientCount);
467
468  int recvNbIndexCount = 0;
469  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
470    recvNbIndexCount += recvNbIndexClientCount[idx];
471
472  unsigned long* recvIndexBuff;
473  unsigned char* recvInfoBuff;
474  if (0 != recvNbIndexCount)
475  {
476    recvIndexBuff = new unsigned long[recvNbIndexCount];
477    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
478  }
479
480  // If a client holds information about index and the corresponding which don't belong to it,
481  // it will send a message to the correct clients.
482  // Contents of the message are index and its corresponding informatioin
483  int request_size = 0;
484   for (int idx = 0; idx < recvRankClient.size(); ++idx)
485   {
486     if (0 != recvNbIndexClientCount[idx])
487     {
488       request_size += 2;
489     }
490   }
491 
492   request_size += client2ClientIndex.size();
493   request_size += client2ClientInfo.size();
494 
495   std::vector<ep_lib::MPI_Request> request(request_size);
496  int currentIndex = 0;
497  int nbRecvClient = recvRankClient.size();
498  int request_position=0;
499  for (int idx = 0; idx < nbRecvClient; ++idx)
500  {
501    if (0 != recvNbIndexClientCount[idx])
502    {
503      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
504      //recvInfoFromClients(recvRankClient[idx],
505      //                    recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
506      //                    recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
507      //                    commLevel, request);
508        recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
509        recvInfoFromClients(recvRankClient[idx],
510                            recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
511                            recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
512                            commLevel, &request[request_position++]);
513    }
514    currentIndex += recvNbIndexClientCount[idx];
515  }
516
517  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
518                                                iteIndex = client2ClientIndex.end();
519  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
520    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
521    //sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
522  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
523                                                      iteInfo = client2ClientInfo.end();
524  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
525    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, &request[request_position++]);
526    //sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
527
528  std::vector<ep_lib::MPI_Status> status(request.size());
529  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
530
531  Index2VectorInfoTypeMap indexToInfoMapping;
532  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
533  currentIndex = 0;
534  InfoType infoValue;
535  int infoIndex = 0;
536  unsigned char* infoBuff = recvInfoBuff;
537  for (int idx = 0; idx < nbRecvClient; ++idx)
538  {
539    size_t index;
540    int count = recvNbIndexClientCount[idx];
541    for (int i = 0; i < count; ++i)
542    {
543      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
544      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
545    }
546    currentIndex += count;
547  }
548
549  if (0 != recvNbIndexCount)
550  {
551    delete [] recvIndexBuff;
552    delete [] recvInfoBuff;
553  }
554  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
555                                                               it != client2ClientInfo.end(); ++it)
556      delete [] it->second;
557
558  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
559                                                        it != client2ClientIndex.end(); ++it)
560      delete [] it->second;
561
562  // Ok, now do something recursive
563  if (0 < level)
564  {
565    --level;
566    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
567  }
568  else
569    index2InfoMapping_.swap(indexToInfoMapping);
570}
571
572/*!
573  Send message containing index to clients
574  \param [in] clientDestRank rank of destination client
575  \param [in] indices index to send
576  \param [in] indiceSize size of index array to send
577  \param [in] clientIntraComm communication group of client
578  \param [in] requestSendIndex list of sending request
579*/
580template<typename T, typename H>
581void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
582                                                       const ep_lib::MPI_Comm& clientIntraComm,
583                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
584{
585  ep_lib::MPI_Request request;
586  requestSendIndex.push_back(request);
587  ep_lib::MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
588            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
589}
590
591/*!
592  Send message containing index to clients
593  \param [in] clientDestRank rank of destination client
594  \param [in] indices index to send
595  \param [in] indiceSize size of index array to send
596  \param [in] clientIntraComm communication group of client
597  \param [in] requestSendIndex sending request
598*/
599template<typename T, typename H>
600void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
601                                                       const ep_lib::MPI_Comm& clientIntraComm,
602                                                       ep_lib::MPI_Request* requestSendIndex)
603{
604  ep_lib::MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
605            clientDestRank, MPI_DHT_INDEX, clientIntraComm, requestSendIndex);
606}
607
608/*!
609  Receive message containing index to clients
610  \param [in] clientDestRank rank of destination client
611  \param [in] indices index to send
612  \param [in] clientIntraComm communication group of client
613  \param [in] requestRecvIndex list of receiving request
614*/
615template<typename T, typename H>
616void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
617                                                         const ep_lib::MPI_Comm& clientIntraComm,
618                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
619{
620  ep_lib::MPI_Request request;
621  requestRecvIndex.push_back(request);
622  ep_lib::MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
623            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
624}
625
626/*!
627  Receive message containing index to clients
628  \param [in] clientDestRank rank of destination client
629  \param [in] indices index to send
630  \param [in] clientIntraComm communication group of client
631  \param [in] requestRecvIndex receiving request
632*/
633template<typename T, typename H>
634void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
635                                                         const ep_lib::MPI_Comm& clientIntraComm,
636                                                         ep_lib::MPI_Request *requestRecvIndex)
637{
638  ep_lib::MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
639            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, requestRecvIndex);
640}
641
642/*!
643  Send message containing information to clients
644  \param [in] clientDestRank rank of destination client
645  \param [in] info info array to send
646  \param [in] infoSize info array size to send
647  \param [in] clientIntraComm communication group of client
648  \param [in] requestSendInfo list of sending request
649*/
650template<typename T, typename H>
651void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
652                                                      const ep_lib::MPI_Comm& clientIntraComm,
653                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
654{
655  ep_lib::MPI_Request request;
656  requestSendInfo.push_back(request);
657
658  ep_lib::MPI_Isend(info, infoSize, MPI_CHAR,
659            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
660}
661
662/*!
663  Send message containing information to clients
664  \param [in] clientDestRank rank of destination client
665  \param [in] info info array to send
666  \param [in] infoSize info array size to send
667  \param [in] clientIntraComm communication group of client
668  \param [in] requestSendInfo sending request
669*/
670template<typename T, typename H>
671void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
672                                                      const ep_lib::MPI_Comm& clientIntraComm,
673                                                      ep_lib::MPI_Request *requestSendInfo)
674{
675  ep_lib::MPI_Isend(info, infoSize, MPI_CHAR,
676            clientDestRank, MPI_DHT_INFO, clientIntraComm, requestSendInfo);
677}
678
679/*!
680  Receive message containing information from other clients
681  \param [in] clientDestRank rank of destination client
682  \param [in] info info array to receive
683  \param [in] infoSize info array size to receive
684  \param [in] clientIntraComm communication group of client
685  \param [in] requestRecvInfo list of sending request
686*/
687template<typename T, typename H>
688void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
689                                                        const ep_lib::MPI_Comm& clientIntraComm,
690                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
691{
692  ep_lib::MPI_Request request;
693  requestRecvInfo.push_back(request);
694
695  ep_lib::MPI_Irecv(info, infoSize, MPI_CHAR,
696            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
697}
698
699/*!
700  Receive message containing information from other clients
701  \param [in] clientDestRank rank of destination client
702  \param [in] info info array to receive
703  \param [in] infoSize info array size to receive
704  \param [in] clientIntraComm communication group of client
705  \param [in] requestRecvInfo list of receiving request
706*/
707template<typename T, typename H>
708void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
709                                                        const ep_lib::MPI_Comm& clientIntraComm,
710                                                        ep_lib::MPI_Request* requestRecvInfo)
711{
712  ep_lib::MPI_Irecv(info, infoSize, MPI_CHAR,
713            clientSrcRank, MPI_DHT_INFO, clientIntraComm, requestRecvInfo);
714}
715
716/*!
717  Compute how many processes one process needs to send to and from how many processes it will receive.
718  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
719*/
720template<typename T, typename H>
721void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
722{
723  int groupBegin = this->getGroupBegin()[level];
724  int nbInGroup  = this->getNbInGroup()[level];
725  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
726  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
727
728  std::vector<size_t> hashedIndexGroup;
729  computeHashIndex(hashedIndexGroup, nbInGroup);
730  size_t a = hashedIndexGroup[rank-groupBegin];
731  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
732
733  int currentGroup, offset;
734  size_t e,f;
735
736  // Do a simple math [a,b) intersect [c,d)
737  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
738  {
739    std::vector<size_t> hashedIndexGroupParent;
740    int nbInGroupParent = nbInGroupParents[idx];
741    if (0 != nbInGroupParent)
742      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
743    for (int i = 0; i < nbInGroupParent; ++i)
744    {
745      size_t c = hashedIndexGroupParent[i];
746      size_t d = hashedIndexGroupParent[i+1]-1;
747
748    if (!((d < a) || (b <c)))
749        recvRank_[level].push_back(groupParentBegin[idx]+i);
750    }
751
752    offset = rank - groupParentBegin[idx];
753    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
754    {
755      e = hashedIndexGroupParent[offset];
756      f = hashedIndexGroupParent[offset+1]-1;
757    }
758  }
759
760  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
761                                      iteHashGroup = hashedIndexGroup.end();
762  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
763  int begin = std::distance(itbHashGroup, itHashGroup)-1;
764  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
765  int end = std::distance(itbHashGroup, itHashGroup) -1;
766  sendRank_[level].resize(end-begin+1);
767  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
768}
769
770/*!
771  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
772  \param [in] sendNbRank Rank of clients to send to
773  \param [in] sendNbElements Number of elements each client to send to
774  \param [in] receiveNbRank Rank of clients to receive from
775  \param [out] recvNbElements Number of elements each client to send to
776*/
777template<typename T, typename H>
778void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
779                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
780{
781  recvNbElements.resize(recvNbRank.size());
782  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
783  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
784
785  int nRequest = 0;
786  for (int idx = 0; idx < recvNbRank.size(); ++idx)
787  {
788    ep_lib::MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
789              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
790    ++nRequest;
791  }
792
793  for (int idx = 0; idx < sendNbRank.size(); ++idx)
794  {
795    ep_lib::MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
796              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
797    ++nRequest;
798  }
799
800  ep_lib::MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
801}
802
803/*!
804  Send and receive number of process each process need to listen to as well as number
805  of index it will receive during the initalization phase
806  \param [in] level current level
807  \param [in] sendNbRank Rank of clients to send to
808  \param [in] sendNbElements Number of elements each client to send to
809  \param [out] receiveNbRank Rank of clients to receive from
810  \param [out] recvNbElements Number of elements each client to send to
811*/
812template<typename T, typename H>
813void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
814                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
815                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
816{
817  int groupBegin = this->getGroupBegin()[level];
818
819  int offSet = 0;
820  std::vector<int>& sendRank = sendRank_[level];
821  std::vector<int>& recvRank = recvRank_[level];
822  int sendBuffSize = sendRank.size();
823  std::vector<int> sendBuff(sendBuffSize*2);
824  int recvBuffSize = recvRank.size();
825  std::vector<int> recvBuff(recvBuffSize*2,0);
826
827  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
828  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
829
830  int nRequest = 0;
831  for (int idx = 0; idx < recvBuffSize; ++idx)
832  {
833    ep_lib::MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
834              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
835    ++nRequest;
836  }
837
838  for (int idx = 0; idx < sendBuffSize; ++idx)
839  {
840    offSet = sendRank[idx]-groupBegin;
841    sendBuff[idx*2] = sendNbRank[offSet];
842    sendBuff[idx*2+1] = sendNbElements[offSet];
843  }
844
845  for (int idx = 0; idx < sendBuffSize; ++idx)
846  {
847    ep_lib::MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
848              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
849    ++nRequest;
850  }
851
852  ep_lib::MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
853  int nbRecvRank = 0, nbRecvElements = 0;
854  recvNbRank.clear();
855  recvNbElements.clear();
856  for (int idx = 0; idx < recvBuffSize; ++idx)
857  {
858    if (0 != recvBuff[2*idx])
859    {
860      recvNbRank.push_back(recvRank[idx]);
861      recvNbElements.push_back(recvBuff[2*idx+1]);
862    }
863  }
864}
865
866}
Note: See TracBrowser for help on using the repository browser.