source: XIOS/dev/branch_openmp/src/client_client_dht_template_impl.hpp @ 1460

Last change on this file since 1460 was 1460, checked in by yushan, 6 years ago

branch_openmp merged with XIOS_DEV_CMIP6@1459

File size: 35.4 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15template<typename T, typename H>
16CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
17  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
18{
19  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
20  this->computeMPICommLevel();
21  int nbLvl = this->getNbLevel();
22  sendRank_.resize(nbLvl);
23  recvRank_.resize(nbLvl);
24}
25
26/*!
27  Constructor with initial distribution information and the corresponding index
28  Each client (process) holds a piece of information as well as the attached index, the index
29will be redistributed (projected) into size_t space as long as the associated information.
30  \param [in] indexInfoMap initial index and information mapping
31  \param [in] clientIntraComm communicator of clients
32  \param [in] hierarLvl level of hierarchy
33*/
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
36                                                        const ep_lib::MPI_Comm& clientIntraComm)
37  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
38{
39  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
40  this->computeMPICommLevel();
41  int nbLvl = this->getNbLevel();
42  sendRank_.resize(nbLvl);
43  recvRank_.resize(nbLvl);
44  Index2VectorInfoTypeMap indexToVecInfoMap;
45  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
46  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
47  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
48  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
49}
50
51/*!
52  Constructor with initial distribution information and the corresponding index
53  Each client (process) holds a piece of information as well as the attached index, the index
54will be redistributed (projected) into size_t space as long as the associated information.
55  \param [in] indexInfoMap initial index and information mapping
56  \param [in] clientIntraComm communicator of clients
57  \param [in] hierarLvl level of hierarchy
58*/
59template<typename T, typename H>
60CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
61                                                        const ep_lib::MPI_Comm& clientIntraComm)
62  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
63{
64  ep_lib::MPI_Comm_size(clientIntraComm, &nbClient_);
65  this->computeMPICommLevel();
66  int nbLvl = this->getNbLevel();
67  sendRank_.resize(nbLvl);
68  recvRank_.resize(nbLvl);
69  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
70}
71
72template<typename T, typename H>
73CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
74{
75}
76
77/*!
78  Compute mapping between indices and information corresponding to these indices
79  \param [in] indices indices a proc has
80*/
81template<typename T, typename H>
82void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
83{
84  int nbLvl = this->getNbLevel();
85  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
86}
87
88/*!
89    Compute mapping between indices and information corresponding to these indices
90for each level of hierarchical DHT. Recursive function
91   \param [in] indices indices a proc has
92   \param [in] commLevel communicator of current level
93   \param [in] level current level
94*/
95template<typename T, typename H>
96void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
97                                                                 const ep_lib::MPI_Comm& commLevel,
98                                                                 int level)
99{
100  int clientRank;
101  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
102  int groupRankBegin = this->getGroupBegin()[level];
103  int nbClient = this->getNbInGroup()[level];
104  std::vector<size_t> hashedIndex;
105  computeHashIndex(hashedIndex, nbClient);
106
107  size_t ssize = indices.numElements(), hashedVal;
108
109  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
110                                      iteClientHash = hashedIndex.end();
111  std::vector<int> sendBuff(nbClient,0);
112  std::vector<int> sendNbIndexBuff(nbClient,0);
113
114  // Number of global index whose mapping server are on other clients
115  int nbIndexToSend = 0;
116  size_t index;
117  HashXIOS<size_t> hashGlobalIndex;
118  boost::unordered_map<size_t,int> nbIndices;
119  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
120  for (int i = 0; i < ssize; ++i)
121  {
122    index = indices(i);
123    if (0 == nbIndices.count(index))
124    {
125      hashedVal  = hashGlobalIndex(index);
126      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
127      int indexClient = std::distance(itbClientHash, itClientHash)-1;
128      ++sendNbIndexBuff[indexClient];
129      nbIndices[index] = 1;
130    }
131  }
132
133  boost::unordered_map<int, size_t* > client2ClientIndex;
134  for (int idx = 0; idx < nbClient; ++idx)
135  {
136    if (0 != sendNbIndexBuff[idx])
137    {
138      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
139      nbIndexToSend += sendNbIndexBuff[idx];
140      sendBuff[idx] = 1;
141      sendNbIndexBuff[idx] = 0;
142    }
143  }
144
145  for (int i = 0; i < ssize; ++i)
146  {
147    index = indices(i);
148    if (1 == nbIndices[index])
149    {
150      hashedVal  = hashGlobalIndex(index);
151      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
152      int indexClient = std::distance(itbClientHash, itClientHash)-1;
153      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
154      ++sendNbIndexBuff[indexClient];
155      ++nbIndices[index];
156    }
157  }
158
159  std::vector<int> recvRankClient, recvNbIndexClientCount;
160  sendRecvRank(level, sendBuff, sendNbIndexBuff,
161               recvRankClient, recvNbIndexClientCount);
162
163  int recvNbIndexCount = 0;
164  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
165    recvNbIndexCount += recvNbIndexClientCount[idx];
166
167  unsigned long* recvIndexBuff;
168  if (0 != recvNbIndexCount)
169    recvIndexBuff = new unsigned long[recvNbIndexCount];
170
171int request_size = 0;
172  for (int idx = 0; idx < recvRankClient.size(); ++idx)
173  {
174    if (0 != recvNbIndexClientCount[idx])
175      request_size ++;
176  }
177
178  request_size += client2ClientIndex.size();
179
180  std::vector<ep_lib::MPI_Request> request(request_size);
181 
182  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
183                             iteRecvIndex = recvRankClient.end(),
184                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
185                           itRecvNbIndex;
186  int currentIndex = 0;
187  int nbRecvClient = recvRankClient.size();
188  int request_position = 0;
189  for (int idx = 0; idx < nbRecvClient; ++idx)
190  {
191    if (0 != recvNbIndexClientCount[idx])
192      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
193      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
194    currentIndex += recvNbIndexClientCount[idx];
195  }
196
197  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
198                                                iteIndex = client2ClientIndex.end();
199  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
200    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
201    //sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
202
203  std::vector<ep_lib::MPI_Status> status(request.size());
204  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
205
206  CArray<size_t,1>* tmpGlobalIndex;
207  if (0 != recvNbIndexCount)
208    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
209  else
210    tmpGlobalIndex = new CArray<size_t,1>();
211
212  // OK, we go to the next level and do something recursive
213  if (0 < level)
214  {
215    --level;
216    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
217  }
218  else // Now, we are in the last level where necessary mappings are.
219    indexToInfoMappingLevel_= (index2InfoMapping_);
220
221  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
222  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
223  currentIndex = 0;
224  for (int idx = 0; idx < nbRecvClient; ++idx)
225  {
226    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
227    {
228      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
229      if (iteIndexToInfoMap != itIndexToInfoMap)
230        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
231    }
232    currentIndex += recvNbIndexClientCount[idx];
233  }
234
235  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
236  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
237  int indexIndex = 0;
238  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
239  {
240    recvRankOnReturn[indexIndex] = itIndex->first;
241  }
242  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
243                   recvRankOnReturn, recvNbIndexOnReturn);
244
245  int recvNbIndexCountOnReturn = 0;
246  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
247    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
248
249  unsigned long* recvIndexBuffOnReturn;
250  unsigned char* recvInfoBuffOnReturn;
251  if (0 != recvNbIndexCountOnReturn)
252  {
253    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
254    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
255  }
256
257int requestOnReturn_size=0;
258  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
259  {
260    if (0 != recvNbIndexOnReturn[idx])
261    {
262      requestOnReturn_size += 2;
263    }
264  }
265
266  for (int idx = 0; idx < nbRecvClient; ++idx)
267  {
268    if (0 != sendNbIndexOnReturn[idx])
269    {
270      requestOnReturn_size += 2;
271    }
272  }
273
274  int requestOnReturn_position=0;
275
276  std::vector<ep_lib::MPI_Request> requestOnReturn(requestOnReturn_size);
277  currentIndex = 0;
278  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
279  {
280    if (0 != recvNbIndexOnReturn[idx])
281    {
282      //recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
283      //recvInfoFromClients(recvRankOnReturn[idx],
284      //                    recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
285      //                    recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
286      //                    commLevel, requestOnReturn);
287      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
288      recvInfoFromClients(recvRankOnReturn[idx],
289                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
290                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
291                          commLevel, &requestOnReturn[requestOnReturn_position++]);
292    }
293    currentIndex += recvNbIndexOnReturn[idx];
294  }
295
296  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
297  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
298  currentIndex = 0;
299  for (int idx = 0; idx < nbRecvClient; ++idx)
300  {
301    if (0 != sendNbIndexOnReturn[idx])
302    {
303      int rank = recvRankClient[idx];
304      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
305      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
306      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
307      int infoIndex = 0;
308      int nb = 0;
309      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
310      {
311        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
312        if (iteIndexToInfoMap != itIndexToInfoMap)
313        {
314          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
315          for (int k = 0; k < infoTmp.size(); ++k)
316          {
317            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
318            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
319            ++nb;
320          }
321        }
322      }
323
324      //sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
325      //                   sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
326      //sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
327      //                  sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
328      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
329                         sendNbIndexOnReturn[idx], commLevel, &requestOnReturn[requestOnReturn_position++]);
330      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
331                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, &requestOnReturn[requestOnReturn_position++]);
332    }
333    currentIndex += recvNbIndexClientCount[idx];
334  }
335
336  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
337  ep_lib::MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
338
339  Index2VectorInfoTypeMap indexToInfoMapping;
340  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
341  int infoIndex = 0;
342  InfoType unpackedInfo;
343  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
344  {
345    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
346    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
347  }
348
349  indexToInfoMappingLevel_.swap(indexToInfoMapping);
350  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
351  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
352                                                        it != client2ClientIndex.end(); ++it)
353      delete [] it->second;
354  delete tmpGlobalIndex;
355
356  if (0 != recvNbIndexCountOnReturn)
357  {
358    delete [] recvIndexBuffOnReturn;
359    delete [] recvInfoBuffOnReturn;
360  }
361
362  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
363                                                               it != client2ClientInfoOnReturn.end(); ++it)
364      delete [] it->second;
365
366  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
367                                            it != client2ClientIndexOnReturn.end(); ++it)
368      delete [] it->second;
369}
370
371/*!
372  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
373*/
374template<typename T, typename H>
375void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
376{
377  // Compute range of hash index for each client
378  hashedIndex.resize(nbClient+1);
379  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
380  size_t nbHashIndex;
381  hashedIndex[0] = 0;
382  for (int i = 1; i < nbClient; ++i)
383  {
384    nbHashIndex = nbHashIndexMax / nbClient;
385    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
386    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
387  }
388  hashedIndex[nbClient] = nbHashIndexMax;
389}
390
391/*!
392  Compute distribution of global index for servers
393  Each client already holds a piece of information and its associated index.
394This information will be redistributed among processes by projecting indices into size_t space,
395the corresponding information will be also distributed on size_t space.
396After the redistribution, each client holds rearranged index and its corresponding information.
397  \param [in] indexInfoMap index and its corresponding info (usually server index)
398  \param [in] commLevel communicator of current level
399  \param [in] level current level
400*/
401template<typename T, typename H>
402void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
403                                                            const ep_lib::MPI_Comm& commLevel,
404                                                            int level)
405{
406  int clientRank;
407  ep_lib::MPI_Comm_rank(commLevel,&clientRank);
408  computeSendRecvRank(level, clientRank);
409
410  int groupRankBegin = this->getGroupBegin()[level];
411  int nbClient = this->getNbInGroup()[level];
412  std::vector<size_t> hashedIndex;
413  computeHashIndex(hashedIndex, nbClient);
414
415  std::vector<int> sendBuff(nbClient,0);
416  std::vector<int> sendNbIndexBuff(nbClient,0);
417  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
418                                      iteClientHash = hashedIndex.end();
419  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
420                                                   ite = indexInfoMap.end();
421  HashXIOS<size_t> hashGlobalIndex;
422
423  // Compute size of sending and receving buffer
424  for (it = itb; it != ite; ++it)
425  {
426    size_t hashIndex = hashGlobalIndex(it->first);
427    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
428    int indexClient = std::distance(itbClientHash, itClientHash)-1;
429    sendNbIndexBuff[indexClient] += it->second.size();
430  }
431
432  boost::unordered_map<int, size_t*> client2ClientIndex;
433  boost::unordered_map<int, unsigned char*> client2ClientInfo;
434  for (int idx = 0; idx < nbClient; ++idx)
435  {
436    if (0 != sendNbIndexBuff[idx])
437    {
438      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
439      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
440      sendNbIndexBuff[idx] = 0;
441      sendBuff[idx] = 1;
442    }
443  }
444
445  std::vector<int> sendNbInfo(nbClient,0);
446  for (it = itb; it != ite; ++it)
447  {
448    const std::vector<InfoType>& infoTmp = it->second;
449    size_t hashIndex = hashGlobalIndex(it->first);
450    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
451    int indexClient = std::distance(itbClientHash, itClientHash)-1;
452    for (int idx = 0; idx < infoTmp.size(); ++idx)
453    {
454      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
455  //          ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
456      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
457      ++sendNbIndexBuff[indexClient];
458    }
459  }
460
461  // Calculate from how many clients each client receive message.
462  // Calculate size of buffer for receiving message
463  std::vector<int> recvRankClient, recvNbIndexClientCount;
464  sendRecvRank(level, sendBuff, sendNbIndexBuff,
465               recvRankClient, recvNbIndexClientCount);
466
467  int recvNbIndexCount = 0;
468  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
469    recvNbIndexCount += recvNbIndexClientCount[idx];
470
471  unsigned long* recvIndexBuff;
472  unsigned char* recvInfoBuff;
473  if (0 != recvNbIndexCount)
474  {
475    recvIndexBuff = new unsigned long[recvNbIndexCount];
476    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
477  }
478
479  // If a client holds information about index and the corresponding which don't belong to it,
480  // it will send a message to the correct clients.
481  // Contents of the message are index and its corresponding informatioin
482  int request_size = 0;
483   for (int idx = 0; idx < recvRankClient.size(); ++idx)
484   {
485     if (0 != recvNbIndexClientCount[idx])
486     {
487       request_size += 2;
488     }
489   }
490 
491   request_size += client2ClientIndex.size();
492   request_size += client2ClientInfo.size();
493 
494   std::vector<ep_lib::MPI_Request> request(request_size);
495  int currentIndex = 0;
496  int nbRecvClient = recvRankClient.size();
497  int request_position=0;
498  for (int idx = 0; idx < nbRecvClient; ++idx)
499  {
500    if (0 != recvNbIndexClientCount[idx])
501    {
502      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
503      //recvInfoFromClients(recvRankClient[idx],
504      //                    recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
505      //                    recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
506      //                    commLevel, request);
507        recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, &request[request_position++]);
508        recvInfoFromClients(recvRankClient[idx],
509                            recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
510                            recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
511                            commLevel, &request[request_position++]);
512    }
513    currentIndex += recvNbIndexClientCount[idx];
514  }
515
516  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
517                                                iteIndex = client2ClientIndex.end();
518  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
519    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, &request[request_position++]);
520    //sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
521  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
522                                                      iteInfo = client2ClientInfo.end();
523  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
524    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, &request[request_position++]);
525    //sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
526
527  std::vector<ep_lib::MPI_Status> status(request.size());
528  ep_lib::MPI_Waitall(request.size(), &request[0], &status[0]);
529
530  Index2VectorInfoTypeMap indexToInfoMapping;
531  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
532  currentIndex = 0;
533  InfoType infoValue;
534  int infoIndex = 0;
535  unsigned char* infoBuff = recvInfoBuff;
536  for (int idx = 0; idx < nbRecvClient; ++idx)
537  {
538    size_t index;
539    int count = recvNbIndexClientCount[idx];
540    for (int i = 0; i < count; ++i)
541    {
542      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
543      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
544    }
545    currentIndex += count;
546  }
547
548  if (0 != recvNbIndexCount)
549  {
550    delete [] recvIndexBuff;
551    delete [] recvInfoBuff;
552  }
553  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
554                                                               it != client2ClientInfo.end(); ++it)
555      delete [] it->second;
556
557  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
558                                                        it != client2ClientIndex.end(); ++it)
559      delete [] it->second;
560
561  // Ok, now do something recursive
562  if (0 < level)
563  {
564    --level;
565    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
566  }
567  else
568    index2InfoMapping_.swap(indexToInfoMapping);
569}
570
571/*!
572  Send message containing index to clients
573  \param [in] clientDestRank rank of destination client
574  \param [in] indices index to send
575  \param [in] indiceSize size of index array to send
576  \param [in] clientIntraComm communication group of client
577  \param [in] requestSendIndex list of sending request
578*/
579template<typename T, typename H>
580void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
581                                                       const ep_lib::MPI_Comm& clientIntraComm,
582                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
583{
584  ep_lib::MPI_Request request;
585  requestSendIndex.push_back(request);
586  ep_lib::MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
587            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
588}
589
590/*!
591  Send message containing index to clients
592  \param [in] clientDestRank rank of destination client
593  \param [in] indices index to send
594  \param [in] indiceSize size of index array to send
595  \param [in] clientIntraComm communication group of client
596  \param [in] requestSendIndex sending request
597*/
598template<typename T, typename H>
599void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
600                                                       const ep_lib::MPI_Comm& clientIntraComm,
601                                                       ep_lib::MPI_Request* requestSendIndex)
602{
603  ep_lib::MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
604            clientDestRank, MPI_DHT_INDEX, clientIntraComm, requestSendIndex);
605}
606
607/*!
608  Receive message containing index to clients
609  \param [in] clientDestRank rank of destination client
610  \param [in] indices index to send
611  \param [in] clientIntraComm communication group of client
612  \param [in] requestRecvIndex list of receiving request
613*/
614template<typename T, typename H>
615void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
616                                                         const ep_lib::MPI_Comm& clientIntraComm,
617                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
618{
619  ep_lib::MPI_Request request;
620  requestRecvIndex.push_back(request);
621  ep_lib::MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
622            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
623}
624
625/*!
626  Receive message containing index to clients
627  \param [in] clientDestRank rank of destination client
628  \param [in] indices index to send
629  \param [in] clientIntraComm communication group of client
630  \param [in] requestRecvIndex receiving request
631*/
632template<typename T, typename H>
633void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
634                                                         const ep_lib::MPI_Comm& clientIntraComm,
635                                                         ep_lib::MPI_Request *requestRecvIndex)
636{
637  ep_lib::MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
638            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, requestRecvIndex);
639}
640
641/*!
642  Send message containing information to clients
643  \param [in] clientDestRank rank of destination client
644  \param [in] info info array to send
645  \param [in] infoSize info array size to send
646  \param [in] clientIntraComm communication group of client
647  \param [in] requestSendInfo list of sending request
648*/
649template<typename T, typename H>
650void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
651                                                      const ep_lib::MPI_Comm& clientIntraComm,
652                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
653{
654  ep_lib::MPI_Request request;
655  requestSendInfo.push_back(request);
656
657  ep_lib::MPI_Isend(info, infoSize, MPI_CHAR,
658            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
659}
660
661/*!
662  Send message containing information to clients
663  \param [in] clientDestRank rank of destination client
664  \param [in] info info array to send
665  \param [in] infoSize info array size to send
666  \param [in] clientIntraComm communication group of client
667  \param [in] requestSendInfo sending request
668*/
669template<typename T, typename H>
670void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
671                                                      const ep_lib::MPI_Comm& clientIntraComm,
672                                                      ep_lib::MPI_Request *requestSendInfo)
673{
674  ep_lib::MPI_Isend(info, infoSize, MPI_CHAR,
675            clientDestRank, MPI_DHT_INFO, clientIntraComm, requestSendInfo);
676}
677
678/*!
679  Receive message containing information from other clients
680  \param [in] clientDestRank rank of destination client
681  \param [in] info info array to receive
682  \param [in] infoSize info array size to receive
683  \param [in] clientIntraComm communication group of client
684  \param [in] requestRecvInfo list of sending request
685*/
686template<typename T, typename H>
687void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
688                                                        const ep_lib::MPI_Comm& clientIntraComm,
689                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
690{
691  ep_lib::MPI_Request request;
692  requestRecvInfo.push_back(request);
693
694  ep_lib::MPI_Irecv(info, infoSize, MPI_CHAR,
695            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
696}
697
698/*!
699  Receive message containing information from other clients
700  \param [in] clientDestRank rank of destination client
701  \param [in] info info array to receive
702  \param [in] infoSize info array size to receive
703  \param [in] clientIntraComm communication group of client
704  \param [in] requestRecvInfo list of receiving request
705*/
706template<typename T, typename H>
707void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
708                                                        const ep_lib::MPI_Comm& clientIntraComm,
709                                                        ep_lib::MPI_Request* requestRecvInfo)
710{
711  ep_lib::MPI_Irecv(info, infoSize, MPI_CHAR,
712            clientSrcRank, MPI_DHT_INFO, clientIntraComm, requestRecvInfo);
713}
714
715/*!
716  Compute how many processes one process needs to send to and from how many processes it will receive.
717  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
718*/
719template<typename T, typename H>
720void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
721{
722  int groupBegin = this->getGroupBegin()[level];
723  int nbInGroup  = this->getNbInGroup()[level];
724  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
725  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
726
727  std::vector<size_t> hashedIndexGroup;
728  computeHashIndex(hashedIndexGroup, nbInGroup);
729  size_t a = hashedIndexGroup[rank-groupBegin];
730  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
731
732  int currentGroup, offset;
733  size_t e,f;
734
735  // Do a simple math [a,b) intersect [c,d)
736  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
737  {
738    std::vector<size_t> hashedIndexGroupParent;
739    int nbInGroupParent = nbInGroupParents[idx];
740    if (0 != nbInGroupParent)
741      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
742    for (int i = 0; i < nbInGroupParent; ++i)
743    {
744      size_t c = hashedIndexGroupParent[i];
745      size_t d = hashedIndexGroupParent[i+1]-1;
746
747    if (!((d < a) || (b <c)))
748        recvRank_[level].push_back(groupParentBegin[idx]+i);
749    }
750
751    offset = rank - groupParentBegin[idx];
752    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
753    {
754      e = hashedIndexGroupParent[offset];
755      f = hashedIndexGroupParent[offset+1]-1;
756    }
757  }
758
759  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
760                                      iteHashGroup = hashedIndexGroup.end();
761  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
762  int begin = std::distance(itbHashGroup, itHashGroup)-1;
763  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
764  int end = std::distance(itbHashGroup, itHashGroup) -1;
765  sendRank_[level].resize(end-begin+1);
766  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
767}
768
769/*!
770  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
771  \param [in] sendNbRank Rank of clients to send to
772  \param [in] sendNbElements Number of elements each client to send to
773  \param [in] receiveNbRank Rank of clients to receive from
774  \param [out] recvNbElements Number of elements each client to send to
775*/
776template<typename T, typename H>
777void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
778                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
779{
780  recvNbElements.resize(recvNbRank.size());
781  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
782  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
783
784  int nRequest = 0;
785  for (int idx = 0; idx < recvNbRank.size(); ++idx)
786  {
787    ep_lib::MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
788              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
789    ++nRequest;
790  }
791
792  for (int idx = 0; idx < sendNbRank.size(); ++idx)
793  {
794    ep_lib::MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
795              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
796    ++nRequest;
797  }
798
799  ep_lib::MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
800}
801
802/*!
803  Send and receive number of process each process need to listen to as well as number
804  of index it will receive during the initalization phase
805  \param [in] level current level
806  \param [in] sendNbRank Rank of clients to send to
807  \param [in] sendNbElements Number of elements each client to send to
808  \param [out] receiveNbRank Rank of clients to receive from
809  \param [out] recvNbElements Number of elements each client to send to
810*/
811template<typename T, typename H>
812void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
813                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
814                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
815{
816  int groupBegin = this->getGroupBegin()[level];
817
818  int offSet = 0;
819  std::vector<int>& sendRank = sendRank_[level];
820  std::vector<int>& recvRank = recvRank_[level];
821  int sendBuffSize = sendRank.size();
822  std::vector<int> sendBuff(sendBuffSize*2);
823  int recvBuffSize = recvRank.size();
824  std::vector<int> recvBuff(recvBuffSize*2,0);
825
826  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
827  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
828  int nRequest = 0;
829  for (int idx = 0; idx < recvBuffSize; ++idx)
830  {
831    ep_lib::MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
832              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest++]);
833  }
834
835  for (int idx = 0; idx < sendBuffSize; ++idx)
836  {
837    offSet = sendRank[idx]-groupBegin;
838    sendBuff[idx*2] = sendNbRank[offSet];
839    sendBuff[idx*2+1] = sendNbElements[offSet];
840  }
841
842  for (int idx = 0; idx < sendBuffSize; ++idx)
843  {
844    ep_lib::MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
845              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest++]);
846  }
847
848  ep_lib::MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
849  int nbRecvRank = 0, nbRecvElements = 0;
850  recvNbRank.clear();
851  recvNbElements.clear();
852  for (int idx = 0; idx < recvBuffSize; ++idx)
853  {
854    if (0 != recvBuff[2*idx])
855    {
856      recvNbRank.push_back(recvRank[idx]);
857      recvNbElements.push_back(recvBuff[2*idx+1]);
858    }
859  }
860}
861
862}
Note: See TracBrowser for help on using the repository browser.