source: XIOS/dev/branch_yushan_merged/src/client_client_dht_template_impl.hpp @ 1185

Last change on this file since 1185 was 1185, checked in by yushan, 7 years ago

save dev. recv_test OK

File size: 29.7 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#include "ep_lib.hpp"
15#endif
16
17
18namespace xios
19{
20template<typename T, typename H>
21CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
22  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
23{
24  MPI_Comm_size(clientIntraComm, &nbClient_);
25  this->computeMPICommLevel();
26  int nbLvl = this->getNbLevel();
27  sendRank_.resize(nbLvl);
28  recvRank_.resize(nbLvl);
29}
30
31/*!
32  Constructor with initial distribution information and the corresponding index
33  Each client (process) holds a piece of information as well as the attached index, the index
34will be redistributed (projected) into size_t space as long as the associated information.
35  \param [in] indexInfoMap initial index and information mapping
36  \param [in] clientIntraComm communicator of clients
37  \param [in] hierarLvl level of hierarchy
38*/
39template<typename T, typename H>
40CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
41                                                        const ep_lib::MPI_Comm& clientIntraComm)
42  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
43{
44  MPI_Comm_size(clientIntraComm, &nbClient_);
45  this->computeMPICommLevel();
46  int nbLvl = this->getNbLevel();
47  sendRank_.resize(nbLvl);
48  recvRank_.resize(nbLvl);
49  Index2VectorInfoTypeMap indexToVecInfoMap;
50  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
51  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
52  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
53  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
54}
55
56/*!
57  Constructor with initial distribution information and the corresponding index
58  Each client (process) holds a piece of information as well as the attached index, the index
59will be redistributed (projected) into size_t space as long as the associated information.
60  \param [in] indexInfoMap initial index and information mapping
61  \param [in] clientIntraComm communicator of clients
62  \param [in] hierarLvl level of hierarchy
63*/
64template<typename T, typename H>
65CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
66                                                        const ep_lib::MPI_Comm& clientIntraComm)
67  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
68{
69  MPI_Comm_size(clientIntraComm, &nbClient_);
70  this->computeMPICommLevel();
71  int nbLvl = this->getNbLevel();
72  sendRank_.resize(nbLvl);
73  recvRank_.resize(nbLvl);
74  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
75}
76
77template<typename T, typename H>
78CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
79{
80}
81
82/*!
83  Compute mapping between indices and information corresponding to these indices
84  \param [in] indices indices a proc has
85*/
86template<typename T, typename H>
87void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
88{
89  int nbLvl = this->getNbLevel();
90  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
91}
92
93/*!
94    Compute mapping between indices and information corresponding to these indices
95for each level of hierarchical DHT. Recursive function
96   \param [in] indices indices a proc has
97   \param [in] commLevel communicator of current level
98   \param [in] level current level
99*/
100template<typename T, typename H>
101void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
102                                                                 const ep_lib::MPI_Comm& commLevel,
103                                                                 int level)
104{
105  int clientRank;
106  MPI_Comm_rank(commLevel,&clientRank);
107  ep_lib::MPI_Barrier(commLevel);
108  int groupRankBegin = this->getGroupBegin()[level];
109  int nbClient = this->getNbInGroup()[level];
110  std::vector<size_t> hashedIndex;
111  computeHashIndex(hashedIndex, nbClient);
112
113  size_t ssize = indices.numElements(), hashedVal;
114
115  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
116                                      iteClientHash = hashedIndex.end();
117  std::vector<int> sendBuff(nbClient,0);
118  std::vector<int> sendNbIndexBuff(nbClient,0);
119
120  // Number of global index whose mapping server are on other clients
121  int nbIndexToSend = 0;
122  size_t index;
123  HashXIOS<size_t> hashGlobalIndex;
124  boost::unordered_map<size_t,int> nbIndices;
125  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
126  for (int i = 0; i < ssize; ++i)
127  {
128    index = indices(i);
129    if (0 == nbIndices.count(index))
130    {
131      hashedVal  = hashGlobalIndex(index);
132      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
133      int indexClient = std::distance(itbClientHash, itClientHash)-1;
134      ++sendNbIndexBuff[indexClient];
135      nbIndices[index] = 1;
136    }
137  }
138
139  boost::unordered_map<int, size_t* > client2ClientIndex;
140  for (int idx = 0; idx < nbClient; ++idx)
141  {
142    if (0 != sendNbIndexBuff[idx])
143    {
144      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
145      nbIndexToSend += sendNbIndexBuff[idx];
146      sendBuff[idx] = 1;
147      sendNbIndexBuff[idx] = 0;
148    }
149  }
150
151  for (int i = 0; i < ssize; ++i)
152  {
153    index = indices(i);
154    if (1 == nbIndices[index])
155    {
156      hashedVal  = hashGlobalIndex(index);
157      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
158      int indexClient = std::distance(itbClientHash, itClientHash)-1;
159      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
160      ++sendNbIndexBuff[indexClient];
161      ++nbIndices[index];
162    }
163  }
164
165  std::vector<int> recvRankClient, recvNbIndexClientCount;
166  sendRecvRank(level, sendBuff, sendNbIndexBuff,
167               recvRankClient, recvNbIndexClientCount);
168
169  int recvNbIndexCount = 0;
170  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
171    recvNbIndexCount += recvNbIndexClientCount[idx];
172
173  unsigned long* recvIndexBuff;
174  if (0 != recvNbIndexCount)
175    recvIndexBuff = new unsigned long[recvNbIndexCount];
176
177  std::vector<ep_lib::MPI_Request> request;
178  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
179                             iteRecvIndex = recvRankClient.end(),
180                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
181                           itRecvNbIndex;
182  int currentIndex = 0;
183  int nbRecvClient = recvRankClient.size();
184 
185  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
186                                                iteIndex = client2ClientIndex.end();
187  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
188    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
189
190
191
192  for (int idx = 0; idx < nbRecvClient; ++idx)
193  {
194    if (0 != recvNbIndexClientCount[idx])
195    {
196      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
197    }
198    currentIndex += recvNbIndexClientCount[idx];
199  }
200
201 
202  std::vector<ep_lib::MPI_Status> status(request.size());
203  MPI_Waitall(request.size(), &request[0], &status[0]);
204 
205
206  CArray<size_t,1>* tmpGlobalIndex;
207  if (0 != recvNbIndexCount)
208    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
209  else
210    tmpGlobalIndex = new CArray<size_t,1>();
211
212  // OK, we go to the next level and do something recursive
213  if (0 < level)
214  {
215    --level;
216    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
217     
218  }
219  else // Now, we are in the last level where necessary mappings are.
220    indexToInfoMappingLevel_= (index2InfoMapping_);
221
222  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
223  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
224  currentIndex = 0;
225  for (int idx = 0; idx < nbRecvClient; ++idx)
226  {
227    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
228    {
229      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
230      if (iteIndexToInfoMap != itIndexToInfoMap)
231        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
232    }
233    currentIndex += recvNbIndexClientCount[idx];
234  }
235
236  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
237  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
238  int indexIndex = 0;
239  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
240  {
241    recvRankOnReturn[indexIndex] = itIndex->first;
242  }
243  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
244                   recvRankOnReturn, recvNbIndexOnReturn);
245
246  int recvNbIndexCountOnReturn = 0;
247  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
248    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
249
250  unsigned long* recvIndexBuffOnReturn;
251  unsigned char* recvInfoBuffOnReturn;
252  if (0 != recvNbIndexCountOnReturn)
253  {
254    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
255    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
256  }
257
258  std::vector<ep_lib::MPI_Request> requestOnReturn;
259  currentIndex = 0;
260  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
261  {
262    if (0 != recvNbIndexOnReturn[idx])
263    {
264      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
265      recvInfoFromClients(recvRankOnReturn[idx],
266                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
267                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
268                          commLevel, requestOnReturn);
269    }
270    currentIndex += recvNbIndexOnReturn[idx];
271  }
272
273  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
274  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
275  currentIndex = 0;
276  for (int idx = 0; idx < nbRecvClient; ++idx)
277  {
278    if (0 != sendNbIndexOnReturn[idx])
279    {
280      int rank = recvRankClient[idx];
281      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
282      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
283      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
284      int infoIndex = 0;
285      int nb = 0;
286      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
287      {
288        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
289        if (iteIndexToInfoMap != itIndexToInfoMap)
290        {
291          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
292          for (int k = 0; k < infoTmp.size(); ++k)
293          {
294            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
295            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
296            ++nb;
297          }
298        }
299      }
300
301      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
302                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
303      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
304                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
305    }
306    currentIndex += recvNbIndexClientCount[idx];
307  }
308
309  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
310  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
311
312  Index2VectorInfoTypeMap indexToInfoMapping;
313  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
314  int infoIndex = 0;
315  InfoType unpackedInfo;
316  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
317  {
318    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
319    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
320  }
321
322  indexToInfoMappingLevel_.swap(indexToInfoMapping);
323  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
324  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
325                                                        it != client2ClientIndex.end(); ++it)
326      delete [] it->second;
327  delete tmpGlobalIndex;
328
329  if (0 != recvNbIndexCountOnReturn)
330  {
331    delete [] recvIndexBuffOnReturn;
332    delete [] recvInfoBuffOnReturn;
333  }
334
335  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
336                                                               it != client2ClientInfoOnReturn.end(); ++it)
337      delete [] it->second;
338
339  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
340                                            it != client2ClientIndexOnReturn.end(); ++it)
341      delete [] it->second;
342}
343
344/*!
345  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
346*/
347template<typename T, typename H>
348void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
349{
350  // Compute range of hash index for each client
351  hashedIndex.resize(nbClient+1);
352  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
353  size_t nbHashIndex;
354  hashedIndex[0] = 0;
355  for (int i = 1; i < nbClient; ++i)
356  {
357    nbHashIndex = nbHashIndexMax / nbClient;
358    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
359    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
360  }
361  hashedIndex[nbClient] = nbHashIndexMax;
362}
363
364/*!
365  Compute distribution of global index for servers
366  Each client already holds a piece of information and its associated index.
367This information will be redistributed among processes by projecting indices into size_t space,
368the corresponding information will be also distributed on size_t space.
369After the redistribution, each client holds rearranged index and its corresponding information.
370  \param [in] indexInfoMap index and its corresponding info (usually server index)
371  \param [in] commLevel communicator of current level
372  \param [in] level current level
373*/
374template<typename T, typename H>
375void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
376                                                            const ep_lib::MPI_Comm& commLevel,
377                                                            int level)
378{
379  int clientRank;
380  MPI_Comm_rank(commLevel,&clientRank);
381  computeSendRecvRank(level, clientRank);
382  ep_lib::MPI_Barrier(commLevel);
383
384  int groupRankBegin = this->getGroupBegin()[level];
385  int nbClient = this->getNbInGroup()[level];
386  std::vector<size_t> hashedIndex;
387  computeHashIndex(hashedIndex, nbClient);
388
389  std::vector<int> sendBuff(nbClient,0);
390  std::vector<int> sendNbIndexBuff(nbClient,0);
391  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
392                                      iteClientHash = hashedIndex.end();
393  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
394                                                   ite = indexInfoMap.end();
395  HashXIOS<size_t> hashGlobalIndex;
396
397  // Compute size of sending and receving buffer
398  for (it = itb; it != ite; ++it)
399  {
400    size_t hashIndex = hashGlobalIndex(it->first);
401    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
402    int indexClient = std::distance(itbClientHash, itClientHash)-1;
403    sendNbIndexBuff[indexClient] += it->second.size();
404  }
405
406  boost::unordered_map<int, size_t*> client2ClientIndex;
407  boost::unordered_map<int, unsigned char*> client2ClientInfo;
408  for (int idx = 0; idx < nbClient; ++idx)
409  {
410    if (0 != sendNbIndexBuff[idx])
411    {
412      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
413      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
414      sendNbIndexBuff[idx] = 0;
415      sendBuff[idx] = 1;
416    }
417  }
418
419  std::vector<int> sendNbInfo(nbClient,0);
420  for (it = itb; it != ite; ++it)
421  {
422    const std::vector<InfoType>& infoTmp = it->second;
423    size_t hashIndex = hashGlobalIndex(it->first);
424    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
425    int indexClient = std::distance(itbClientHash, itClientHash)-1;
426    for (int idx = 0; idx < infoTmp.size(); ++idx)
427    {
428      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
429      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
430      ++sendNbIndexBuff[indexClient];
431    }
432  }
433
434  // Calculate from how many clients each client receive message.
435  // Calculate size of buffer for receiving message
436  std::vector<int> recvRankClient, recvNbIndexClientCount;
437  sendRecvRank(level, sendBuff, sendNbIndexBuff,
438               recvRankClient, recvNbIndexClientCount);
439
440  int recvNbIndexCount = 0;
441  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
442    recvNbIndexCount += recvNbIndexClientCount[idx];
443
444  unsigned long* recvIndexBuff;
445  unsigned char* recvInfoBuff;
446  if (0 != recvNbIndexCount)
447  {
448    recvIndexBuff = new unsigned long[recvNbIndexCount];
449    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
450  }
451
452  // If a client holds information about index and the corresponding which don't belong to it,
453  // it will send a message to the correct clients.
454  // Contents of the message are index and its corresponding informatioin
455  std::vector<ep_lib::MPI_Request> request;
456  int currentIndex = 0;
457  int nbRecvClient = recvRankClient.size();
458  for (int idx = 0; idx < nbRecvClient; ++idx)
459  {
460    if (0 != recvNbIndexClientCount[idx])
461    {
462      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
463      recvInfoFromClients(recvRankClient[idx],
464                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
465                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
466                          commLevel, request);
467    }
468    currentIndex += recvNbIndexClientCount[idx];
469  }
470
471  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
472                                                iteIndex = client2ClientIndex.end();
473  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
474  {
475    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
476  }
477
478  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
479                                                      iteInfo = client2ClientInfo.end();
480  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
481  {
482    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
483
484  }
485
486  std::vector<ep_lib::MPI_Status> status(request.size());
487
488  MPI_Waitall(request.size(), &request[0], &status[0]);
489
490  Index2VectorInfoTypeMap indexToInfoMapping;
491  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
492  currentIndex = 0;
493  InfoType infoValue;
494  int infoIndex = 0;
495  unsigned char* infoBuff = recvInfoBuff;
496  for (int idx = 0; idx < nbRecvClient; ++idx)
497  {
498    size_t index;
499    int count = recvNbIndexClientCount[idx];
500    for (int i = 0; i < count; ++i)
501    {
502      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
503      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
504    }
505    currentIndex += count;
506  }
507
508  if (0 != recvNbIndexCount)
509  {
510    delete [] recvIndexBuff;
511    delete [] recvInfoBuff;
512  }
513  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
514                                                               it != client2ClientInfo.end(); ++it)
515      delete [] it->second;
516
517  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
518                                                        it != client2ClientIndex.end(); ++it)
519      delete [] it->second;
520
521  // Ok, now do something recursive
522  if (0 < level)
523  {
524    --level;
525    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
526  }
527  else
528    index2InfoMapping_.swap(indexToInfoMapping);
529}
530
531/*!
532  Send message containing index to clients
533  \param [in] clientDestRank rank of destination client
534  \param [in] indices index to send
535  \param [in] indiceSize size of index array to send
536  \param [in] clientIntraComm communication group of client
537  \param [in] requestSendIndex list of sending request
538*/
539template<typename T, typename H>
540void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
541                                                       const ep_lib::MPI_Comm& clientIntraComm,
542                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
543{
544  ep_lib::MPI_Request request;
545  requestSendIndex.push_back(request);
546  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
547            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
548}
549
550/*!
551  Receive message containing index to clients
552  \param [in] clientDestRank rank of destination client
553  \param [in] indices index to send
554  \param [in] clientIntraComm communication group of client
555  \param [in] requestRecvIndex list of receiving request
556*/
557template<typename T, typename H>
558void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
559                                                         const ep_lib::MPI_Comm& clientIntraComm,
560                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
561{
562  ep_lib::MPI_Request request;
563  requestRecvIndex.push_back(request);
564  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
565            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
566}
567
568/*!
569  Send message containing information to clients
570  \param [in] clientDestRank rank of destination client
571  \param [in] info info array to send
572  \param [in] infoSize info array size to send
573  \param [in] clientIntraComm communication group of client
574  \param [in] requestSendInfo list of sending request
575*/
576template<typename T, typename H>
577void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
578                                                      const ep_lib::MPI_Comm& clientIntraComm,
579                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
580{
581  ep_lib::MPI_Request request;
582  requestSendInfo.push_back(request);
583  MPI_Isend(info, infoSize, MPI_CHAR,
584            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
585}
586
587/*!
588  Receive message containing information from other clients
589  \param [in] clientDestRank rank of destination client
590  \param [in] info info array to receive
591  \param [in] infoSize info array size to receive
592  \param [in] clientIntraComm communication group of client
593  \param [in] requestRecvInfo list of sending request
594*/
595template<typename T, typename H>
596void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
597                                                        const ep_lib::MPI_Comm& clientIntraComm,
598                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
599{
600  ep_lib::MPI_Request request;
601  requestRecvInfo.push_back(request);
602
603  MPI_Irecv(info, infoSize, MPI_CHAR,
604            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
605}
606
607/*!
608  Compute how many processes one process needs to send to and from how many processes it will receive.
609  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
610*/
611template<typename T, typename H>
612void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
613{
614  int groupBegin = this->getGroupBegin()[level];
615  int nbInGroup  = this->getNbInGroup()[level];
616  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
617  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
618
619  std::vector<size_t> hashedIndexGroup;
620  computeHashIndex(hashedIndexGroup, nbInGroup);
621  size_t a = hashedIndexGroup[rank-groupBegin];
622  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
623
624  int currentGroup, offset;
625  size_t e,f;
626
627  // Do a simple math [a,b) intersect [c,d)
628  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
629  {
630    std::vector<size_t> hashedIndexGroupParent;
631    int nbInGroupParent = nbInGroupParents[idx];
632    if (0 != nbInGroupParent)
633      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
634    for (int i = 0; i < nbInGroupParent; ++i)
635    {
636      size_t c = hashedIndexGroupParent[i];
637      size_t d = hashedIndexGroupParent[i+1]-1;
638
639    if (!((d < a) || (b <c)))
640        recvRank_[level].push_back(groupParentBegin[idx]+i);
641    }
642
643    offset = rank - groupParentBegin[idx];
644    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
645    {
646      e = hashedIndexGroupParent[offset];
647      f = hashedIndexGroupParent[offset+1]-1;
648    }
649  }
650
651  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
652                                      iteHashGroup = hashedIndexGroup.end();
653  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
654  int begin = std::distance(itbHashGroup, itHashGroup)-1;
655  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
656  int end = std::distance(itbHashGroup, itHashGroup) -1;
657  sendRank_[level].resize(end-begin+1);
658  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
659}
660
661/*!
662  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
663  \param [in] sendNbRank Rank of clients to send to
664  \param [in] sendNbElements Number of elements each client to send to
665  \param [in] receiveNbRank Rank of clients to receive from
666  \param [out] recvNbElements Number of elements each client to send to
667*/
668template<typename T, typename H>
669void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
670                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
671{
672  recvNbElements.resize(recvNbRank.size());
673  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
674  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
675
676  int nRequest = 0;
677 
678
679  for (int idx = 0; idx < sendNbRank.size(); ++idx)
680  {
681    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
682              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
683    ++nRequest;
684  }
685 
686  for (int idx = 0; idx < recvNbRank.size(); ++idx)
687  {
688    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
689              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
690    ++nRequest;
691  }
692
693  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
694}
695
696/*!
697  Send and receive number of process each process need to listen to as well as number
698  of index it will receive during the initalization phase
699  \param [in] level current level
700  \param [in] sendNbRank Rank of clients to send to
701  \param [in] sendNbElements Number of elements each client to send to
702  \param [out] receiveNbRank Rank of clients to receive from
703  \param [out] recvNbElements Number of elements each client to send to
704*/
705template<typename T, typename H>
706void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
707                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
708                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
709{
710  int groupBegin = this->getGroupBegin()[level];
711
712  int offSet = 0;
713  std::vector<int>& sendRank = sendRank_[level];
714  std::vector<int>& recvRank = recvRank_[level];
715  int sendBuffSize = sendRank.size();
716  std::vector<int> sendBuff(sendBuffSize*2);
717  int recvBuffSize = recvRank.size();
718  std::vector<int> recvBuff(recvBuffSize*2,0);
719
720  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
721  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
722
723  int my_rank;
724  MPI_Comm_rank(this->internalComm_, &my_rank);
725 
726  int nRequest = 0;
727 
728
729  for (int idx = 0; idx < sendBuffSize; ++idx)
730  {
731    offSet = sendRank[idx]-groupBegin;
732    sendBuff[idx*2] = sendNbRank[offSet];
733    sendBuff[idx*2+1] = sendNbElements[offSet];
734  }
735 
736 
737
738  for (int idx = 0; idx < sendBuffSize; ++idx)
739  {
740    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
741              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
742    ++nRequest;
743  }
744 
745  for (int idx = 0; idx < recvBuffSize; ++idx)
746  {
747    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
748              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
749    ++nRequest;
750  }
751
752  //MPI_Barrier(this->internalComm_);
753
754  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
755
756  int nbRecvRank = 0, nbRecvElements = 0;
757  recvNbRank.clear();
758  recvNbElements.clear();
759  for (int idx = 0; idx < recvBuffSize; ++idx)
760  {
761    if (0 != recvBuff[2*idx])
762    {
763      recvNbRank.push_back(recvRank[idx]);
764      recvNbElements.push_back(recvBuff[2*idx+1]);
765    }
766  }
767}
768
769}
Note: See TracBrowser for help on using the repository browser.