source: XIOS/dev/branch_yushan_merged/src/client_client_dht_template_impl.hpp @ 1164

Last change on this file since 1164 was 1164, checked in by yushan, 7 years ago

Bug fixed in MPI_(All)Gatherv with displs

File size: 29.7 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#endif
15
16
17namespace xios
18{
19template<typename T, typename H>
20CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
21  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
22{
23  MPI_Comm_size(clientIntraComm, &nbClient_);
24  this->computeMPICommLevel();
25  int nbLvl = this->getNbLevel();
26  sendRank_.resize(nbLvl);
27  recvRank_.resize(nbLvl);
28}
29
30/*!
31  Constructor with initial distribution information and the corresponding index
32  Each client (process) holds a piece of information as well as the attached index, the index
33will be redistributed (projected) into size_t space as long as the associated information.
34  \param [in] indexInfoMap initial index and information mapping
35  \param [in] clientIntraComm communicator of clients
36  \param [in] hierarLvl level of hierarchy
37*/
38template<typename T, typename H>
39CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
40                                                        const ep_lib::MPI_Comm& clientIntraComm)
41  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
42{
43  MPI_Comm_size(clientIntraComm, &nbClient_);
44  this->computeMPICommLevel();
45  int nbLvl = this->getNbLevel();
46  sendRank_.resize(nbLvl);
47  recvRank_.resize(nbLvl);
48  Index2VectorInfoTypeMap indexToVecInfoMap;
49  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
50  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
51  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
52  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
53}
54
55/*!
56  Constructor with initial distribution information and the corresponding index
57  Each client (process) holds a piece of information as well as the attached index, the index
58will be redistributed (projected) into size_t space as long as the associated information.
59  \param [in] indexInfoMap initial index and information mapping
60  \param [in] clientIntraComm communicator of clients
61  \param [in] hierarLvl level of hierarchy
62*/
63template<typename T, typename H>
64CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
65                                                        const ep_lib::MPI_Comm& clientIntraComm)
66  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
67{
68  MPI_Comm_size(clientIntraComm, &nbClient_);
69  this->computeMPICommLevel();
70  int nbLvl = this->getNbLevel();
71  sendRank_.resize(nbLvl);
72  recvRank_.resize(nbLvl);
73  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
74}
75
76template<typename T, typename H>
77CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
78{
79}
80
81/*!
82  Compute mapping between indices and information corresponding to these indices
83  \param [in] indices indices a proc has
84*/
85template<typename T, typename H>
86void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
87{
88  int nbLvl = this->getNbLevel();
89  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
90}
91
92/*!
93    Compute mapping between indices and information corresponding to these indices
94for each level of hierarchical DHT. Recursive function
95   \param [in] indices indices a proc has
96   \param [in] commLevel communicator of current level
97   \param [in] level current level
98*/
99template<typename T, typename H>
100void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
101                                                                 const ep_lib::MPI_Comm& commLevel,
102                                                                 int level)
103{
104  int clientRank;
105  MPI_Comm_rank(commLevel,&clientRank);
106  int groupRankBegin = this->getGroupBegin()[level];
107  int nbClient = this->getNbInGroup()[level];
108  std::vector<size_t> hashedIndex;
109  computeHashIndex(hashedIndex, nbClient);
110
111  size_t ssize = indices.numElements(), hashedVal;
112
113  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
114                                      iteClientHash = hashedIndex.end();
115  std::vector<int> sendBuff(nbClient,0);
116  std::vector<int> sendNbIndexBuff(nbClient,0);
117
118  // Number of global index whose mapping server are on other clients
119  int nbIndexToSend = 0;
120  size_t index;
121  HashXIOS<size_t> hashGlobalIndex;
122  boost::unordered_map<size_t,int> nbIndices;
123  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
124  for (int i = 0; i < ssize; ++i)
125  {
126    index = indices(i);
127    if (0 == nbIndices.count(index))
128    {
129      hashedVal  = hashGlobalIndex(index);
130      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
131      int indexClient = std::distance(itbClientHash, itClientHash)-1;
132      ++sendNbIndexBuff[indexClient];
133      nbIndices[index] = 1;
134    }
135  }
136
137  boost::unordered_map<int, size_t* > client2ClientIndex;
138  for (int idx = 0; idx < nbClient; ++idx)
139  {
140    if (0 != sendNbIndexBuff[idx])
141    {
142      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
143      nbIndexToSend += sendNbIndexBuff[idx];
144      sendBuff[idx] = 1;
145      sendNbIndexBuff[idx] = 0;
146    }
147  }
148
149  for (int i = 0; i < ssize; ++i)
150  {
151    index = indices(i);
152    if (1 == nbIndices[index])
153    {
154      hashedVal  = hashGlobalIndex(index);
155      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
156      int indexClient = std::distance(itbClientHash, itClientHash)-1;
157      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
158      ++sendNbIndexBuff[indexClient];
159      ++nbIndices[index];
160    }
161  }
162
163  std::vector<int> recvRankClient, recvNbIndexClientCount;
164  sendRecvRank(level, sendBuff, sendNbIndexBuff,
165               recvRankClient, recvNbIndexClientCount);
166
167  int recvNbIndexCount = 0;
168  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
169    recvNbIndexCount += recvNbIndexClientCount[idx];
170
171  unsigned long* recvIndexBuff;
172  if (0 != recvNbIndexCount)
173    recvIndexBuff = new unsigned long[recvNbIndexCount];
174
175  std::vector<ep_lib::MPI_Request> request;
176  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
177                             iteRecvIndex = recvRankClient.end(),
178                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
179                           itRecvNbIndex;
180  int currentIndex = 0;
181  int nbRecvClient = recvRankClient.size();
182  for (int idx = 0; idx < nbRecvClient; ++idx)
183  {
184    if (0 != recvNbIndexClientCount[idx])
185    {
186      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
187    }
188    currentIndex += recvNbIndexClientCount[idx];
189  }
190
191  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
192                                                iteIndex = client2ClientIndex.end();
193  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
194    sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
195
196  std::vector<ep_lib::MPI_Status> status(request.size());
197  MPI_Waitall(request.size(), &request[0], &status[0]);
198
199  CArray<size_t,1>* tmpGlobalIndex;
200  if (0 != recvNbIndexCount)
201    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
202  else
203    tmpGlobalIndex = new CArray<size_t,1>();
204
205  // OK, we go to the next level and do something recursive
206  if (0 < level)
207  {
208    --level;
209    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
210  }
211  else // Now, we are in the last level where necessary mappings are.
212    indexToInfoMappingLevel_= (index2InfoMapping_);
213
214  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
215  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
216  currentIndex = 0;
217  for (int idx = 0; idx < nbRecvClient; ++idx)
218  {
219    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
220    {
221      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
222      if (iteIndexToInfoMap != itIndexToInfoMap)
223        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
224    }
225    currentIndex += recvNbIndexClientCount[idx];
226  }
227
228  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
229  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
230  int indexIndex = 0;
231  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
232  {
233    recvRankOnReturn[indexIndex] = itIndex->first;
234  }
235  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
236                   recvRankOnReturn, recvNbIndexOnReturn);
237
238  int recvNbIndexCountOnReturn = 0;
239  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
240    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
241
242  unsigned long* recvIndexBuffOnReturn;
243  unsigned char* recvInfoBuffOnReturn;
244  if (0 != recvNbIndexCountOnReturn)
245  {
246    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
247    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
248  }
249
250  std::vector<ep_lib::MPI_Request> requestOnReturn;
251  currentIndex = 0;
252  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
253  {
254    if (0 != recvNbIndexOnReturn[idx])
255    {
256      recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
257      recvInfoFromClients(recvRankOnReturn[idx],
258                          recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
259                          recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
260                          commLevel, requestOnReturn);
261    }
262    currentIndex += recvNbIndexOnReturn[idx];
263  }
264
265  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
266  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
267  currentIndex = 0;
268  for (int idx = 0; idx < nbRecvClient; ++idx)
269  {
270    if (0 != sendNbIndexOnReturn[idx])
271    {
272      int rank = recvRankClient[idx];
273      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
274      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
275      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
276      int infoIndex = 0;
277      int nb = 0;
278      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
279      {
280        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
281        if (iteIndexToInfoMap != itIndexToInfoMap)
282        {
283          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
284          for (int k = 0; k < infoTmp.size(); ++k)
285          {
286            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
287            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
288            ++nb;
289          }
290        }
291      }
292
293      sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
294                         sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
295      sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
296                        sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
297    }
298    currentIndex += recvNbIndexClientCount[idx];
299  }
300
301  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
302  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
303
304  Index2VectorInfoTypeMap indexToInfoMapping;
305  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
306  int infoIndex = 0;
307  InfoType unpackedInfo;
308  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
309  {
310    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
311    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
312  }
313
314  indexToInfoMappingLevel_.swap(indexToInfoMapping);
315  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
316  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
317                                                        it != client2ClientIndex.end(); ++it)
318      delete [] it->second;
319  delete tmpGlobalIndex;
320
321  if (0 != recvNbIndexCountOnReturn)
322  {
323    delete [] recvIndexBuffOnReturn;
324    delete [] recvInfoBuffOnReturn;
325  }
326
327  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
328                                                               it != client2ClientInfoOnReturn.end(); ++it)
329      delete [] it->second;
330
331  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
332                                            it != client2ClientIndexOnReturn.end(); ++it)
333      delete [] it->second;
334}
335
336/*!
337  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
338*/
339template<typename T, typename H>
340void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
341{
342  // Compute range of hash index for each client
343  hashedIndex.resize(nbClient+1);
344  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
345  size_t nbHashIndex;
346  hashedIndex[0] = 0;
347  for (int i = 1; i < nbClient; ++i)
348  {
349    nbHashIndex = nbHashIndexMax / nbClient;
350    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
351    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
352  }
353  hashedIndex[nbClient] = nbHashIndexMax;
354}
355
356/*!
357  Compute distribution of global index for servers
358  Each client already holds a piece of information and its associated index.
359This information will be redistributed among processes by projecting indices into size_t space,
360the corresponding information will be also distributed on size_t space.
361After the redistribution, each client holds rearranged index and its corresponding information.
362  \param [in] indexInfoMap index and its corresponding info (usually server index)
363  \param [in] commLevel communicator of current level
364  \param [in] level current level
365*/
366template<typename T, typename H>
367void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
368                                                            const ep_lib::MPI_Comm& commLevel,
369                                                            int level)
370{
371  int clientRank;
372  MPI_Comm_rank(commLevel,&clientRank);
373  computeSendRecvRank(level, clientRank);
374
375  int groupRankBegin = this->getGroupBegin()[level];
376  int nbClient = this->getNbInGroup()[level];
377  std::vector<size_t> hashedIndex;
378  computeHashIndex(hashedIndex, nbClient);
379
380  std::vector<int> sendBuff(nbClient,0);
381  std::vector<int> sendNbIndexBuff(nbClient,0);
382  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
383                                      iteClientHash = hashedIndex.end();
384  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
385                                                   ite = indexInfoMap.end();
386  HashXIOS<size_t> hashGlobalIndex;
387
388  // Compute size of sending and receving buffer
389  for (it = itb; it != ite; ++it)
390  {
391    size_t hashIndex = hashGlobalIndex(it->first);
392    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
393    int indexClient = std::distance(itbClientHash, itClientHash)-1;
394    sendNbIndexBuff[indexClient] += it->second.size();
395  }
396
397  boost::unordered_map<int, size_t*> client2ClientIndex;
398  boost::unordered_map<int, unsigned char*> client2ClientInfo;
399  for (int idx = 0; idx < nbClient; ++idx)
400  {
401    if (0 != sendNbIndexBuff[idx])
402    {
403      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
404      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
405      sendNbIndexBuff[idx] = 0;
406      sendBuff[idx] = 1;
407    }
408  }
409
410  std::vector<int> sendNbInfo(nbClient,0);
411  for (it = itb; it != ite; ++it)
412  {
413    const std::vector<InfoType>& infoTmp = it->second;
414    size_t hashIndex = hashGlobalIndex(it->first);
415    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
416    int indexClient = std::distance(itbClientHash, itClientHash)-1;
417    for (int idx = 0; idx < infoTmp.size(); ++idx)
418    {
419      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
420      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
421      ++sendNbIndexBuff[indexClient];
422    }
423  }
424
425  // Calculate from how many clients each client receive message.
426  // Calculate size of buffer for receiving message
427  std::vector<int> recvRankClient, recvNbIndexClientCount;
428  sendRecvRank(level, sendBuff, sendNbIndexBuff,
429               recvRankClient, recvNbIndexClientCount);
430
431  int recvNbIndexCount = 0;
432  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
433    recvNbIndexCount += recvNbIndexClientCount[idx];
434
435  unsigned long* recvIndexBuff;
436  unsigned char* recvInfoBuff;
437  if (0 != recvNbIndexCount)
438  {
439    recvIndexBuff = new unsigned long[recvNbIndexCount];
440    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
441  }
442
443  // If a client holds information about index and the corresponding which don't belong to it,
444  // it will send a message to the correct clients.
445  // Contents of the message are index and its corresponding informatioin
446  std::vector<ep_lib::MPI_Request> request;
447  int currentIndex = 0;
448  int nbRecvClient = recvRankClient.size();
449  for (int idx = 0; idx < nbRecvClient; ++idx)
450  {
451    if (0 != recvNbIndexClientCount[idx])
452    {
453      recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
454      recvInfoFromClients(recvRankClient[idx],
455                          recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
456                          recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
457                          commLevel, request);
458    }
459    currentIndex += recvNbIndexClientCount[idx];
460  }
461
462  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
463                                                iteIndex = client2ClientIndex.end();
464  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
465  {
466    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
467  }
468
469  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
470                                                      iteInfo = client2ClientInfo.end();
471  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
472  {
473    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
474
475  }
476
477  std::vector<ep_lib::MPI_Status> status(request.size());
478
479  MPI_Waitall(request.size(), &request[0], &status[0]);
480
481  Index2VectorInfoTypeMap indexToInfoMapping;
482  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
483  currentIndex = 0;
484  InfoType infoValue;
485  int infoIndex = 0;
486  unsigned char* infoBuff = recvInfoBuff;
487  for (int idx = 0; idx < nbRecvClient; ++idx)
488  {
489    size_t index;
490    int count = recvNbIndexClientCount[idx];
491    for (int i = 0; i < count; ++i)
492    {
493      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
494      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
495    }
496    currentIndex += count;
497  }
498
499  if (0 != recvNbIndexCount)
500  {
501    delete [] recvIndexBuff;
502    delete [] recvInfoBuff;
503  }
504  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
505                                                               it != client2ClientInfo.end(); ++it)
506      delete [] it->second;
507
508  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
509                                                        it != client2ClientIndex.end(); ++it)
510      delete [] it->second;
511
512  // Ok, now do something recursive
513  if (0 < level)
514  {
515    --level;
516    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
517  }
518  else
519    index2InfoMapping_.swap(indexToInfoMapping);
520}
521
522/*!
523  Send message containing index to clients
524  \param [in] clientDestRank rank of destination client
525  \param [in] indices index to send
526  \param [in] indiceSize size of index array to send
527  \param [in] clientIntraComm communication group of client
528  \param [in] requestSendIndex list of sending request
529*/
530template<typename T, typename H>
531void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
532                                                       const ep_lib::MPI_Comm& clientIntraComm,
533                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
534{
535  ep_lib::MPI_Request request;
536  requestSendIndex.push_back(request);
537  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
538            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
539}
540
541/*!
542  Receive message containing index to clients
543  \param [in] clientDestRank rank of destination client
544  \param [in] indices index to send
545  \param [in] clientIntraComm communication group of client
546  \param [in] requestRecvIndex list of receiving request
547*/
548template<typename T, typename H>
549void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
550                                                         const ep_lib::MPI_Comm& clientIntraComm,
551                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
552{
553  ep_lib::MPI_Request request;
554  requestRecvIndex.push_back(request);
555  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
556            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
557}
558
559/*!
560  Send message containing information to clients
561  \param [in] clientDestRank rank of destination client
562  \param [in] info info array to send
563  \param [in] infoSize info array size to send
564  \param [in] clientIntraComm communication group of client
565  \param [in] requestSendInfo list of sending request
566*/
567template<typename T, typename H>
568void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
569                                                      const ep_lib::MPI_Comm& clientIntraComm,
570                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
571{
572  ep_lib::MPI_Request request;
573  requestSendInfo.push_back(request);
574  MPI_Isend(info, infoSize, MPI_CHAR,
575            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
576}
577
578/*!
579  Receive message containing information from other clients
580  \param [in] clientDestRank rank of destination client
581  \param [in] info info array to receive
582  \param [in] infoSize info array size to receive
583  \param [in] clientIntraComm communication group of client
584  \param [in] requestRecvInfo list of sending request
585*/
586template<typename T, typename H>
587void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
588                                                        const ep_lib::MPI_Comm& clientIntraComm,
589                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
590{
591  ep_lib::MPI_Request request;
592  requestRecvInfo.push_back(request);
593
594  MPI_Irecv(info, infoSize, MPI_CHAR,
595            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
596}
597
598/*!
599  Compute how many processes one process needs to send to and from how many processes it will receive.
600  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
601*/
602template<typename T, typename H>
603void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
604{
605  int groupBegin = this->getGroupBegin()[level];
606  int nbInGroup  = this->getNbInGroup()[level];
607  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
608  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
609
610  std::vector<size_t> hashedIndexGroup;
611  computeHashIndex(hashedIndexGroup, nbInGroup);
612  size_t a = hashedIndexGroup[rank-groupBegin];
613  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
614
615  int currentGroup, offset;
616  size_t e,f;
617
618  // Do a simple math [a,b) intersect [c,d)
619  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
620  {
621    std::vector<size_t> hashedIndexGroupParent;
622    int nbInGroupParent = nbInGroupParents[idx];
623    if (0 != nbInGroupParent)
624      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
625    for (int i = 0; i < nbInGroupParent; ++i)
626    {
627      size_t c = hashedIndexGroupParent[i];
628      size_t d = hashedIndexGroupParent[i+1]-1;
629
630    if (!((d < a) || (b <c)))
631        recvRank_[level].push_back(groupParentBegin[idx]+i);
632    }
633
634    offset = rank - groupParentBegin[idx];
635    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
636    {
637      e = hashedIndexGroupParent[offset];
638      f = hashedIndexGroupParent[offset+1]-1;
639    }
640  }
641
642  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
643                                      iteHashGroup = hashedIndexGroup.end();
644  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
645  int begin = std::distance(itbHashGroup, itHashGroup)-1;
646  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
647  int end = std::distance(itbHashGroup, itHashGroup) -1;
648  sendRank_[level].resize(end-begin+1);
649  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
650}
651
652/*!
653  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
654  \param [in] sendNbRank Rank of clients to send to
655  \param [in] sendNbElements Number of elements each client to send to
656  \param [in] receiveNbRank Rank of clients to receive from
657  \param [out] recvNbElements Number of elements each client to send to
658*/
659template<typename T, typename H>
660void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
661                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
662{
663  recvNbElements.resize(recvNbRank.size());
664  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
665  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
666
667  int nRequest = 0;
668  for (int idx = 0; idx < recvNbRank.size(); ++idx)
669  {
670    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
671              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
672    ++nRequest;
673  }
674
675  for (int idx = 0; idx < sendNbRank.size(); ++idx)
676  {
677    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
678              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
679    ++nRequest;
680  }
681
682  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
683}
684
685/*!
686  Send and receive number of process each process need to listen to as well as number
687  of index it will receive during the initalization phase
688  \param [in] level current level
689  \param [in] sendNbRank Rank of clients to send to
690  \param [in] sendNbElements Number of elements each client to send to
691  \param [out] receiveNbRank Rank of clients to receive from
692  \param [out] recvNbElements Number of elements each client to send to
693*/
694template<typename T, typename H>
695void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
696                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
697                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
698{
699  int groupBegin = this->getGroupBegin()[level];
700
701  int offSet = 0;
702  std::vector<int>& sendRank = sendRank_[level];
703  std::vector<int>& recvRank = recvRank_[level];
704  int sendBuffSize = sendRank.size();
705  std::vector<int> sendBuff(sendBuffSize*2);
706  int recvBuffSize = recvRank.size();
707  std::vector<int> recvBuff(recvBuffSize*2,0);
708
709  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
710  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
711
712  int my_rank;
713  MPI_Comm_rank(this->internalComm_, &my_rank);
714 
715  int nRequest = 0;
716  for (int idx = 0; idx < recvBuffSize; ++idx)
717  {
718    MPI_Irecv(&recvBuff[0]+2*idx, 2, MPI_INT,
719              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
720    printf("proc %d recv from proc %d with tag %d\n", my_rank, recvRank[idx], MPI_DHT_INDEX_0);         
721    ++nRequest;
722  }
723
724  for (int idx = 0; idx < sendBuffSize; ++idx)
725  {
726    offSet = sendRank[idx]-groupBegin;
727    sendBuff[idx*2] = sendNbRank[offSet];
728    sendBuff[idx*2+1] = sendNbElements[offSet];
729  }
730 
731 
732
733  for (int idx = 0; idx < sendBuffSize; ++idx)
734  {
735    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
736              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
737    printf("proc %d send to proc %d with tag %d\n", my_rank, sendRank[idx], MPI_DHT_INDEX_0);
738    ++nRequest;
739  }
740
741  //MPI_Barrier(this->internalComm_);
742
743  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
744
745  int nbRecvRank = 0, nbRecvElements = 0;
746  recvNbRank.clear();
747  recvNbElements.clear();
748  for (int idx = 0; idx < recvBuffSize; ++idx)
749  {
750    if (0 != recvBuff[2*idx])
751    {
752      recvNbRank.push_back(recvRank[idx]);
753      recvNbElements.push_back(recvBuff[2*idx+1]);
754    }
755  }
756}
757
758}
Note: See TracBrowser for help on using the repository browser.