source: XIOS/dev/branch_yushan_merged/src/client_client_dht_template_impl.hpp @ 1176

Last change on this file since 1176 was 1176, checked in by yushan, 7 years ago

save modif

File size: 29.6 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#endif
15
16
17namespace xios
18{
19template<typename T, typename H>
20CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
21  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
22{
23  MPI_Comm_size(clientIntraComm, &nbClient_);
24  this->computeMPICommLevel();
25  int nbLvl = this->getNbLevel();
26  sendRank_.resize(nbLvl);
27  recvRank_.resize(nbLvl);
28}
29
30/*!
31  Constructor with initial distribution information and the corresponding index
32  Each client (process) holds a piece of information as well as the attached index, the index
33will be redistributed (projected) into size_t space as long as the associated information.
34  \param [in] indexInfoMap initial index and information mapping
35  \param [in] clientIntraComm communicator of clients
36  \param [in] hierarLvl level of hierarchy
37*/
38template<typename T, typename H>
39CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
40                                                        const ep_lib::MPI_Comm& clientIntraComm)
41  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
42{
43  MPI_Comm_size(clientIntraComm, &nbClient_);
44  this->computeMPICommLevel();
45  int nbLvl = this->getNbLevel();
46  sendRank_.resize(nbLvl);
47  recvRank_.resize(nbLvl);
48  Index2VectorInfoTypeMap indexToVecInfoMap;
49  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
50  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
51  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
52  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
53}
54
55/*!
56  Constructor with initial distribution information and the corresponding index
57  Each client (process) holds a piece of information as well as the attached index, the index
58will be redistributed (projected) into size_t space as long as the associated information.
59  \param [in] indexInfoMap initial index and information mapping
60  \param [in] clientIntraComm communicator of clients
61  \param [in] hierarLvl level of hierarchy
62*/
63template<typename T, typename H>
64CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
65                                                        const ep_lib::MPI_Comm& clientIntraComm)
66  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
67{
68  MPI_Comm_size(clientIntraComm, &nbClient_);
69  this->computeMPICommLevel();
70  int nbLvl = this->getNbLevel();
71  sendRank_.resize(nbLvl);
72  recvRank_.resize(nbLvl);
73  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
74}
75
76template<typename T, typename H>
77CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
78{
79}
80
81/*!
82  Compute mapping between indices and information corresponding to these indices
83  \param [in] indices indices a proc has
84*/
85template<typename T, typename H>
86void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
87{
88  int nbLvl = this->getNbLevel();
89  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
90}
91
92/*!
93    Compute mapping between indices and information corresponding to these indices
94for each level of hierarchical DHT. Recursive function
95   \param [in] indices indices a proc has
96   \param [in] commLevel communicator of current level
97   \param [in] level current level
98*/
99template<typename T, typename H>
100void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
101                                                                 const ep_lib::MPI_Comm& commLevel,
102                                                                 int level)
103{
104  int clientRank;
105  MPI_Comm_rank(commLevel,&clientRank);
106  ep_lib::MPI_Barrier(commLevel);
107  int groupRankBegin = this->getGroupBegin()[level];
108  int nbClient = this->getNbInGroup()[level];
109  std::vector<size_t> hashedIndex;
110  computeHashIndex(hashedIndex, nbClient);
111
112  size_t ssize = indices.numElements(), hashedVal;
113
114  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
115                                      iteClientHash = hashedIndex.end();
116  std::vector<int> sendBuff(nbClient,0);
117  std::vector<int> sendNbIndexBuff(nbClient,0);
118
119  // Number of global index whose mapping server are on other clients
120  int nbIndexToSend = 0;
121  size_t index;
122  HashXIOS<size_t> hashGlobalIndex;
123  boost::unordered_map<size_t,int> nbIndices;
124  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
125  for (int i = 0; i < ssize; ++i)
126  {
127    index = indices(i);
128    if (0 == nbIndices.count(index))
129    {
130      hashedVal  = hashGlobalIndex(index);
131      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
132      int indexClient = std::distance(itbClientHash, itClientHash)-1;
133      ++sendNbIndexBuff[indexClient];
134      nbIndices[index] = 1;
135    }
136  }
137
138  boost::unordered_map<int, size_t* > client2ClientIndex;
139  for (int idx = 0; idx < nbClient; ++idx)
140  {
141    if (0 != sendNbIndexBuff[idx])
142    {
143      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
144      nbIndexToSend += sendNbIndexBuff[idx];
145      sendBuff[idx] = 1;
146      sendNbIndexBuff[idx] = 0;
147    }
148  }
149
150  for (int i = 0; i < ssize; ++i)
151  {
152    index = indices(i);
153    if (1 == nbIndices[index])
154    {
155      hashedVal  = hashGlobalIndex(index);
156      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
157      int indexClient = std::distance(itbClientHash, itClientHash)-1;
158      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
159      ++sendNbIndexBuff[indexClient];
160      ++nbIndices[index];
161    }
162  }
163
164  std::vector<int> recvRankClient, recvNbIndexClientCount;
165  sendRecvRank(level, sendBuff, sendNbIndexBuff,
166               recvRankClient, recvNbIndexClientCount);
167
168  int recvNbIndexCount = 0;
169  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
170    recvNbIndexCount += recvNbIndexClientCount[idx];
171
172  unsigned long* recvIndexBuff;
173  if (0 != recvNbIndexCount)
174    recvIndexBuff = new unsigned long[recvNbIndexCount];
175
176  std::vector<ep_lib::MPI_Request> request;
177  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
178                             iteRecvIndex = recvRankClient.end(),
179                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
180                           itRecvNbIndex;
181  int currentIndex = 0;
182  int nbRecvClient = recvRankClient.size();
183 
184  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
185                                                iteIndex = client2ClientIndex.end();
186  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
187    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
188
189
190
191  for (int idx = 0; idx < nbRecvClient; ++idx)
192  {
193    if (0 != recvNbIndexClientCount[idx])
194    {
195      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
196    }
197    currentIndex += recvNbIndexClientCount[idx];
198  }
199
200 
201  std::vector<ep_lib::MPI_Status> status(request.size());
202  MPI_Waitall(request.size(), &request[0], &status[0]);
203 
204
205  CArray<size_t,1>* tmpGlobalIndex;
206  if (0 != recvNbIndexCount)
207    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
208  else
209    tmpGlobalIndex = new CArray<size_t,1>();
210
211  // OK, we go to the next level and do something recursive
212  if (0 < level)
213  {
214    --level;
215    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
216     
217  }
218  else // Now, we are in the last level where necessary mappings are.
219    indexToInfoMappingLevel_= (index2InfoMapping_);
220
221  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
222  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
223  currentIndex = 0;
224  for (int idx = 0; idx < nbRecvClient; ++idx)
225  {
226    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
227    {
228      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
229      if (iteIndexToInfoMap != itIndexToInfoMap)
230        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
231    }
232    currentIndex += recvNbIndexClientCount[idx];
233  }
234
235  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
236  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
237  int indexIndex = 0;
238  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
239  {
240    recvRankOnReturn[indexIndex] = itIndex->first;
241  }
242  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
243                   recvRankOnReturn, recvNbIndexOnReturn);
244
245  int recvNbIndexCountOnReturn = 0;
246  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
247    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
248
249  unsigned long* recvIndexBuffOnReturn;
250  unsigned char* recvInfoBuffOnReturn;
251  if (0 != recvNbIndexCountOnReturn)
252  {
253    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
254    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
255  }
256
257  std::vector<ep_lib::MPI_Request> requestOnReturn;
258  currentIndex = 0;
259  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
260  {
261    if (0 != recvNbIndexOnReturn[idx])
262    {
263      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
264      recvInfoFromClients(recvRankOnReturn[idx],
265                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
266                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
267                          commLevel, requestOnReturn);
268    }
269    currentIndex += recvNbIndexOnReturn[idx];
270  }
271
272  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
273  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
274  currentIndex = 0;
275  for (int idx = 0; idx < nbRecvClient; ++idx)
276  {
277    if (0 != sendNbIndexOnReturn[idx])
278    {
279      int rank = recvRankClient[idx];
280      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
281      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
282      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
283      int infoIndex = 0;
284      int nb = 0;
285      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
286      {
287        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
288        if (iteIndexToInfoMap != itIndexToInfoMap)
289        {
290          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
291          for (int k = 0; k < infoTmp.size(); ++k)
292          {
293            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
294            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
295            ++nb;
296          }
297        }
298      }
299
300      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
301                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
302      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
303                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
304    }
305    currentIndex += recvNbIndexClientCount[idx];
306  }
307
308  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
309  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
310
311  Index2VectorInfoTypeMap indexToInfoMapping;
312  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
313  int infoIndex = 0;
314  InfoType unpackedInfo;
315  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
316  {
317    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
318    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
319  }
320
321  indexToInfoMappingLevel_.swap(indexToInfoMapping);
322  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
323  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
324                                                        it != client2ClientIndex.end(); ++it)
325      delete [] it->second;
326  delete tmpGlobalIndex;
327
328  if (0 != recvNbIndexCountOnReturn)
329  {
330    delete [] recvIndexBuffOnReturn;
331    delete [] recvInfoBuffOnReturn;
332  }
333
334  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
335                                                               it != client2ClientInfoOnReturn.end(); ++it)
336      delete [] it->second;
337
338  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
339                                            it != client2ClientIndexOnReturn.end(); ++it)
340      delete [] it->second;
341}
342
343/*!
344  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
345*/
346template<typename T, typename H>
347void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
348{
349  // Compute range of hash index for each client
350  hashedIndex.resize(nbClient+1);
351  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
352  size_t nbHashIndex;
353  hashedIndex[0] = 0;
354  for (int i = 1; i < nbClient; ++i)
355  {
356    nbHashIndex = nbHashIndexMax / nbClient;
357    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
358    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
359  }
360  hashedIndex[nbClient] = nbHashIndexMax;
361}
362
363/*!
364  Compute distribution of global index for servers
365  Each client already holds a piece of information and its associated index.
366This information will be redistributed among processes by projecting indices into size_t space,
367the corresponding information will be also distributed on size_t space.
368After the redistribution, each client holds rearranged index and its corresponding information.
369  \param [in] indexInfoMap index and its corresponding info (usually server index)
370  \param [in] commLevel communicator of current level
371  \param [in] level current level
372*/
373template<typename T, typename H>
374void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
375                                                            const ep_lib::MPI_Comm& commLevel,
376                                                            int level)
377{
378  int clientRank;
379  MPI_Comm_rank(commLevel,&clientRank);
380  computeSendRecvRank(level, clientRank);
381  ep_lib::MPI_Barrier(commLevel);
382
383  int groupRankBegin = this->getGroupBegin()[level];
384  int nbClient = this->getNbInGroup()[level];
385  std::vector<size_t> hashedIndex;
386  computeHashIndex(hashedIndex, nbClient);
387
388  std::vector<int> sendBuff(nbClient,0);
389  std::vector<int> sendNbIndexBuff(nbClient,0);
390  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
391                                      iteClientHash = hashedIndex.end();
392  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
393                                                   ite = indexInfoMap.end();
394  HashXIOS<size_t> hashGlobalIndex;
395
396  // Compute size of sending and receving buffer
397  for (it = itb; it != ite; ++it)
398  {
399    size_t hashIndex = hashGlobalIndex(it->first);
400    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
401    int indexClient = std::distance(itbClientHash, itClientHash)-1;
402    sendNbIndexBuff[indexClient] += it->second.size();
403  }
404
405  boost::unordered_map<int, size_t*> client2ClientIndex;
406  boost::unordered_map<int, unsigned char*> client2ClientInfo;
407  for (int idx = 0; idx < nbClient; ++idx)
408  {
409    if (0 != sendNbIndexBuff[idx])
410    {
411      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
412      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
413      sendNbIndexBuff[idx] = 0;
414      sendBuff[idx] = 1;
415    }
416  }
417
418  std::vector<int> sendNbInfo(nbClient,0);
419  for (it = itb; it != ite; ++it)
420  {
421    const std::vector<InfoType>& infoTmp = it->second;
422    size_t hashIndex = hashGlobalIndex(it->first);
423    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
424    int indexClient = std::distance(itbClientHash, itClientHash)-1;
425    for (int idx = 0; idx < infoTmp.size(); ++idx)
426    {
427      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
428      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
429      ++sendNbIndexBuff[indexClient];
430    }
431  }
432
433  // Calculate from how many clients each client receive message.
434  // Calculate size of buffer for receiving message
435  std::vector<int> recvRankClient, recvNbIndexClientCount;
436  sendRecvRank(level, sendBuff, sendNbIndexBuff,
437               recvRankClient, recvNbIndexClientCount);
438
439  int recvNbIndexCount = 0;
440  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
441    recvNbIndexCount += recvNbIndexClientCount[idx];
442
443  unsigned long* recvIndexBuff;
444  unsigned char* recvInfoBuff;
445  if (0 != recvNbIndexCount)
446  {
447    recvIndexBuff = new unsigned long[recvNbIndexCount];
448    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
449  }
450
451  // If a client holds information about index and the corresponding which don't belong to it,
452  // it will send a message to the correct clients.
453  // Contents of the message are index and its corresponding informatioin
454  std::vector<ep_lib::MPI_Request> request;
455  int currentIndex = 0;
456  int nbRecvClient = recvRankClient.size();
457  for (int idx = 0; idx < nbRecvClient; ++idx)
458  {
459    if (0 != recvNbIndexClientCount[idx])
460    {
461      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
462      recvInfoFromClients(recvRankClient[idx],
463                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
464                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
465                          commLevel, request);
466    }
467    currentIndex += recvNbIndexClientCount[idx];
468  }
469
470  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
471                                                iteIndex = client2ClientIndex.end();
472  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
473  {
474    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
475  }
476
477  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
478                                                      iteInfo = client2ClientInfo.end();
479  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
480  {
481    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
482
483  }
484
485  std::vector<ep_lib::MPI_Status> status(request.size());
486
487  MPI_Waitall(request.size(), &request[0], &status[0]);
488
489  Index2VectorInfoTypeMap indexToInfoMapping;
490  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
491  currentIndex = 0;
492  InfoType infoValue;
493  int infoIndex = 0;
494  unsigned char* infoBuff = recvInfoBuff;
495  for (int idx = 0; idx < nbRecvClient; ++idx)
496  {
497    size_t index;
498    int count = recvNbIndexClientCount[idx];
499    for (int i = 0; i < count; ++i)
500    {
501      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
502      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
503    }
504    currentIndex += count;
505  }
506
507  if (0 != recvNbIndexCount)
508  {
509    delete [] recvIndexBuff;
510    delete [] recvInfoBuff;
511  }
512  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
513                                                               it != client2ClientInfo.end(); ++it)
514      delete [] it->second;
515
516  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
517                                                        it != client2ClientIndex.end(); ++it)
518      delete [] it->second;
519
520  // Ok, now do something recursive
521  if (0 < level)
522  {
523    --level;
524    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
525  }
526  else
527    index2InfoMapping_.swap(indexToInfoMapping);
528}
529
530/*!
531  Send message containing index to clients
532  \param [in] clientDestRank rank of destination client
533  \param [in] indices index to send
534  \param [in] indiceSize size of index array to send
535  \param [in] clientIntraComm communication group of client
536  \param [in] requestSendIndex list of sending request
537*/
538template<typename T, typename H>
539void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
540                                                       const ep_lib::MPI_Comm& clientIntraComm,
541                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
542{
543  ep_lib::MPI_Request request;
544  requestSendIndex.push_back(request);
545  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
546            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
547}
548
549/*!
550  Receive message containing index to clients
551  \param [in] clientDestRank rank of destination client
552  \param [in] indices index to send
553  \param [in] clientIntraComm communication group of client
554  \param [in] requestRecvIndex list of receiving request
555*/
556template<typename T, typename H>
557void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
558                                                         const ep_lib::MPI_Comm& clientIntraComm,
559                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
560{
561  ep_lib::MPI_Request request;
562  requestRecvIndex.push_back(request);
563  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
564            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
565}
566
567/*!
568  Send message containing information to clients
569  \param [in] clientDestRank rank of destination client
570  \param [in] info info array to send
571  \param [in] infoSize info array size to send
572  \param [in] clientIntraComm communication group of client
573  \param [in] requestSendInfo list of sending request
574*/
575template<typename T, typename H>
576void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
577                                                      const ep_lib::MPI_Comm& clientIntraComm,
578                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
579{
580  ep_lib::MPI_Request request;
581  requestSendInfo.push_back(request);
582  MPI_Isend(info, infoSize, MPI_CHAR,
583            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
584}
585
586/*!
587  Receive message containing information from other clients
588  \param [in] clientDestRank rank of destination client
589  \param [in] info info array to receive
590  \param [in] infoSize info array size to receive
591  \param [in] clientIntraComm communication group of client
592  \param [in] requestRecvInfo list of sending request
593*/
594template<typename T, typename H>
595void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
596                                                        const ep_lib::MPI_Comm& clientIntraComm,
597                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
598{
599  ep_lib::MPI_Request request;
600  requestRecvInfo.push_back(request);
601
602  MPI_Irecv(info, infoSize, MPI_CHAR,
603            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
604}
605
606/*!
607  Compute how many processes one process needs to send to and from how many processes it will receive.
608  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
609*/
610template<typename T, typename H>
611void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
612{
613  int groupBegin = this->getGroupBegin()[level];
614  int nbInGroup  = this->getNbInGroup()[level];
615  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
616  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
617
618  std::vector<size_t> hashedIndexGroup;
619  computeHashIndex(hashedIndexGroup, nbInGroup);
620  size_t a = hashedIndexGroup[rank-groupBegin];
621  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
622
623  int currentGroup, offset;
624  size_t e,f;
625
626  // Do a simple math [a,b) intersect [c,d)
627  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
628  {
629    std::vector<size_t> hashedIndexGroupParent;
630    int nbInGroupParent = nbInGroupParents[idx];
631    if (0 != nbInGroupParent)
632      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
633    for (int i = 0; i < nbInGroupParent; ++i)
634    {
635      size_t c = hashedIndexGroupParent[i];
636      size_t d = hashedIndexGroupParent[i+1]-1;
637
638    if (!((d < a) || (b <c)))
639        recvRank_[level].push_back(groupParentBegin[idx]+i);
640    }
641
642    offset = rank - groupParentBegin[idx];
643    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
644    {
645      e = hashedIndexGroupParent[offset];
646      f = hashedIndexGroupParent[offset+1]-1;
647    }
648  }
649
650  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
651                                      iteHashGroup = hashedIndexGroup.end();
652  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
653  int begin = std::distance(itbHashGroup, itHashGroup)-1;
654  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
655  int end = std::distance(itbHashGroup, itHashGroup) -1;
656  sendRank_[level].resize(end-begin+1);
657  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
658}
659
660/*!
661  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
662  \param [in] sendNbRank Rank of clients to send to
663  \param [in] sendNbElements Number of elements each client to send to
664  \param [in] receiveNbRank Rank of clients to receive from
665  \param [out] recvNbElements Number of elements each client to send to
666*/
667template<typename T, typename H>
668void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
669                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
670{
671  recvNbElements.resize(recvNbRank.size());
672  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
673  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
674
675  int nRequest = 0;
676 
677
678  for (int idx = 0; idx < sendNbRank.size(); ++idx)
679  {
680    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
681              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
682    ++nRequest;
683  }
684 
685  for (int idx = 0; idx < recvNbRank.size(); ++idx)
686  {
687    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
688              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
689    ++nRequest;
690  }
691
692  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
693}
694
695/*!
696  Send and receive number of process each process need to listen to as well as number
697  of index it will receive during the initalization phase
698  \param [in] level current level
699  \param [in] sendNbRank Rank of clients to send to
700  \param [in] sendNbElements Number of elements each client to send to
701  \param [out] receiveNbRank Rank of clients to receive from
702  \param [out] recvNbElements Number of elements each client to send to
703*/
704template<typename T, typename H>
705void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
706                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
707                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
708{
709  int groupBegin = this->getGroupBegin()[level];
710
711  int offSet = 0;
712  std::vector<int>& sendRank = sendRank_[level];
713  std::vector<int>& recvRank = recvRank_[level];
714  int sendBuffSize = sendRank.size();
715  std::vector<int> sendBuff(sendBuffSize*2);
716  int recvBuffSize = recvRank.size();
717  std::vector<int> recvBuff(recvBuffSize*2,0);
718
719  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
720  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
721
722  int my_rank;
723  MPI_Comm_rank(this->internalComm_, &my_rank);
724 
725  int nRequest = 0;
726 
727
728  for (int idx = 0; idx < sendBuffSize; ++idx)
729  {
730    offSet = sendRank[idx]-groupBegin;
731    sendBuff[idx*2] = sendNbRank[offSet];
732    sendBuff[idx*2+1] = sendNbElements[offSet];
733  }
734 
735 
736
737  for (int idx = 0; idx < sendBuffSize; ++idx)
738  {
739    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
740              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
741    ++nRequest;
742  }
743 
744  for (int idx = 0; idx < recvBuffSize; ++idx)
745  {
746    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
747              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
748    ++nRequest;
749  }
750
751  //MPI_Barrier(this->internalComm_);
752
753  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
754
755  int nbRecvRank = 0, nbRecvElements = 0;
756  recvNbRank.clear();
757  recvNbElements.clear();
758  for (int idx = 0; idx < recvBuffSize; ++idx)
759  {
760    if (0 != recvBuff[2*idx])
761    {
762      recvNbRank.push_back(recvRank[idx]);
763      recvNbElements.push_back(recvBuff[2*idx+1]);
764    }
765  }
766}
767
768}
Note: See TracBrowser for help on using the repository browser.