source: XIOS/dev/branch_yushan_merged/src/client_client_dht_template_impl.hpp @ 1203

Last change on this file since 1203 was 1203, checked in by yushan, 7 years ago

prep to merge with trunk @1200

File size: 34.3 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12#ifdef _usingEP
13#include "ep_declaration.hpp"
14#include "ep_lib.hpp"
15#endif
16
17
18namespace xios
19{
20template<typename T, typename H>
21CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const ep_lib::MPI_Comm& clientIntraComm)
22  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
23{
24  MPI_Comm_size(clientIntraComm, &nbClient_);
25  this->computeMPICommLevel();
26  int nbLvl = this->getNbLevel();
27  sendRank_.resize(nbLvl);
28  recvRank_.resize(nbLvl);
29}
30
31/*!
32  Constructor with initial distribution information and the corresponding index
33  Each client (process) holds a piece of information as well as the attached index, the index
34will be redistributed (projected) into size_t space as long as the associated information.
35  \param [in] indexInfoMap initial index and information mapping
36  \param [in] clientIntraComm communicator of clients
37  \param [in] hierarLvl level of hierarchy
38*/
39template<typename T, typename H>
40CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2InfoTypeMap& indexInfoMap,
41                                                        const ep_lib::MPI_Comm& clientIntraComm)
42  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
43{
44  MPI_Comm_size(clientIntraComm, &nbClient_);
45  this->computeMPICommLevel();
46  int nbLvl = this->getNbLevel();
47  sendRank_.resize(nbLvl);
48  recvRank_.resize(nbLvl);
49  Index2VectorInfoTypeMap indexToVecInfoMap;
50  indexToVecInfoMap.rehash(std::ceil(indexInfoMap.size()/indexToVecInfoMap.max_load_factor()));
51  typename Index2InfoTypeMap::const_iterator it = indexInfoMap.begin(), ite = indexInfoMap.end();
52  for (; it != ite; ++it) indexToVecInfoMap[it->first].push_back(it->second);
53  computeDistributedIndex(indexToVecInfoMap, clientIntraComm, nbLvl-1);
54}
55
56/*!
57  Constructor with initial distribution information and the corresponding index
58  Each client (process) holds a piece of information as well as the attached index, the index
59will be redistributed (projected) into size_t space as long as the associated information.
60  \param [in] indexInfoMap initial index and information mapping
61  \param [in] clientIntraComm communicator of clients
62  \param [in] hierarLvl level of hierarchy
63*/
64template<typename T, typename H>
65CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const Index2VectorInfoTypeMap& indexInfoMap,
66                                                        const ep_lib::MPI_Comm& clientIntraComm)
67  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_(), nbClient_(0)
68{
69  MPI_Comm_size(clientIntraComm, &nbClient_);
70  this->computeMPICommLevel();
71  int nbLvl = this->getNbLevel();
72  sendRank_.resize(nbLvl);
73  recvRank_.resize(nbLvl);
74  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
75}
76
77template<typename T, typename H>
78CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
79{
80}
81
82/*!
83  Compute mapping between indices and information corresponding to these indices
84  \param [in] indices indices a proc has
85*/
86template<typename T, typename H>
87void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
88{
89  int nbLvl = this->getNbLevel();
90  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
91}
92
93/*!
94    Compute mapping between indices and information corresponding to these indices
95for each level of hierarchical DHT. Recursive function
96   \param [in] indices indices a proc has
97   \param [in] commLevel communicator of current level
98   \param [in] level current level
99*/
100template<typename T, typename H>
101void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
102                                                                 const ep_lib::MPI_Comm& commLevel,
103                                                                 int level)
104{
105  int clientRank;
106  MPI_Comm_rank(commLevel,&clientRank);
107  ep_lib::MPI_Barrier(commLevel);
108  int groupRankBegin = this->getGroupBegin()[level];
109  int nbClient = this->getNbInGroup()[level];
110  std::vector<size_t> hashedIndex;
111  computeHashIndex(hashedIndex, nbClient);
112
113  size_t ssize = indices.numElements(), hashedVal;
114
115  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
116                                      iteClientHash = hashedIndex.end();
117  std::vector<int> sendBuff(nbClient,0);
118  std::vector<int> sendNbIndexBuff(nbClient,0);
119
120  // Number of global index whose mapping server are on other clients
121  int nbIndexToSend = 0;
122  size_t index;
123  HashXIOS<size_t> hashGlobalIndex;
124  boost::unordered_map<size_t,int> nbIndices;
125  nbIndices.rehash(std::ceil(ssize/nbIndices.max_load_factor()));
126  for (int i = 0; i < ssize; ++i)
127  {
128    index = indices(i);
129    if (0 == nbIndices.count(index))
130    {
131      hashedVal  = hashGlobalIndex(index);
132      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
133      int indexClient = std::distance(itbClientHash, itClientHash)-1;
134      ++sendNbIndexBuff[indexClient];
135      nbIndices[index] = 1;
136    }
137  }
138
139  boost::unordered_map<int, size_t* > client2ClientIndex;
140  for (int idx = 0; idx < nbClient; ++idx)
141  {
142    if (0 != sendNbIndexBuff[idx])
143    {
144      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
145      nbIndexToSend += sendNbIndexBuff[idx];
146      sendBuff[idx] = 1;
147      sendNbIndexBuff[idx] = 0;
148    }
149  }
150
151  for (int i = 0; i < ssize; ++i)
152  {
153    index = indices(i);
154    if (1 == nbIndices[index])
155    {
156      hashedVal  = hashGlobalIndex(index);
157      itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
158      int indexClient = std::distance(itbClientHash, itClientHash)-1;
159      client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;
160      ++sendNbIndexBuff[indexClient];
161      ++nbIndices[index];
162    }
163  }
164
165  std::vector<int> recvRankClient, recvNbIndexClientCount;
166  sendRecvRank(level, sendBuff, sendNbIndexBuff,
167               recvRankClient, recvNbIndexClientCount);
168
169  int recvNbIndexCount = 0;
170  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
171    recvNbIndexCount += recvNbIndexClientCount[idx];
172
173  unsigned long* recvIndexBuff;
174  if (0 != recvNbIndexCount)
175    recvIndexBuff = new unsigned long[recvNbIndexCount];
176
177  int request_size = 0;
178
179  int currentIndex = 0;
180  int nbRecvClient = recvRankClient.size();
181
182  int position = 0;
183
184  for (int idx = 0; idx < nbRecvClient; ++idx)
185  {
186    if (0 != recvNbIndexClientCount[idx])
187    {
188      request_size++;
189    }
190  }
191
192  request_size += client2ClientIndex.size();
193
194
195  std::vector<ep_lib::MPI_Request> request(request_size);
196
197  std::vector<int>::iterator itbRecvIndex = recvRankClient.begin(), itRecvIndex,
198                             iteRecvIndex = recvRankClient.end(),
199                           itbRecvNbIndex = recvNbIndexClientCount.begin(),
200                           itRecvNbIndex;
201 
202 
203  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
204                                                iteIndex = client2ClientIndex.end();
205  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
206  {
207    MPI_Isend(itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG, itIndex->first, MPI_DHT_INDEX, commLevel, &request[position]);
208    position++;
209    //sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
210  }
211   
212  for (int idx = 0; idx < nbRecvClient; ++idx)
213  {
214    if (0 != recvNbIndexClientCount[idx])
215    {
216      MPI_Irecv(recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], MPI_UNSIGNED_LONG,
217            recvRankClient[idx], MPI_DHT_INDEX, commLevel, &request[position]);
218      position++;
219      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
220    }
221    currentIndex += recvNbIndexClientCount[idx];
222  }
223
224 
225  std::vector<ep_lib::MPI_Status> status(request_size);
226  MPI_Waitall(request.size(), &request[0], &status[0]);
227
228
229  CArray<size_t,1>* tmpGlobalIndex;
230  if (0 != recvNbIndexCount)
231    tmpGlobalIndex = new CArray<size_t,1>(recvIndexBuff, shape(recvNbIndexCount), neverDeleteData);
232  else
233    tmpGlobalIndex = new CArray<size_t,1>();
234
235  // OK, we go to the next level and do something recursive
236  if (0 < level)
237  {
238    --level;
239    computeIndexInfoMappingLevel(*tmpGlobalIndex, this->internalComm_, level);
240     
241  }
242  else // Now, we are in the last level where necessary mappings are.
243    indexToInfoMappingLevel_= (index2InfoMapping_);
244
245  typename Index2VectorInfoTypeMap::const_iterator iteIndexToInfoMap = indexToInfoMappingLevel_.end(), itIndexToInfoMap;
246  std::vector<int> sendNbIndexOnReturn(nbRecvClient,0);
247  currentIndex = 0;
248  for (int idx = 0; idx < nbRecvClient; ++idx)
249  {
250    for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
251    {
252      itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
253      if (iteIndexToInfoMap != itIndexToInfoMap)
254        sendNbIndexOnReturn[idx] += itIndexToInfoMap->second.size();
255    }
256    currentIndex += recvNbIndexClientCount[idx];
257  }
258
259  std::vector<int> recvRankOnReturn(client2ClientIndex.size());
260  std::vector<int> recvNbIndexOnReturn(client2ClientIndex.size(),0);
261  int indexIndex = 0;
262  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex, ++indexIndex)
263  {
264    recvRankOnReturn[indexIndex] = itIndex->first;
265  }
266  sendRecvOnReturn(recvRankClient, sendNbIndexOnReturn,
267                   recvRankOnReturn, recvNbIndexOnReturn);
268
269  int recvNbIndexCountOnReturn = 0;
270  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
271    recvNbIndexCountOnReturn += recvNbIndexOnReturn[idx];
272
273  unsigned long* recvIndexBuffOnReturn;
274  unsigned char* recvInfoBuffOnReturn;
275  if (0 != recvNbIndexCountOnReturn)
276  {
277    recvIndexBuffOnReturn = new unsigned long[recvNbIndexCountOnReturn];
278    recvInfoBuffOnReturn = new unsigned char[recvNbIndexCountOnReturn*ProcessDHTElement<InfoType>::typeSize()];
279  }
280
281  request_size = 0;
282  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
283  {
284    if (0 != recvNbIndexOnReturn[idx])
285    {
286      request_size += 2;
287    }
288  }
289
290  for (int idx = 0; idx < nbRecvClient; ++idx)
291  {
292    if (0 != sendNbIndexOnReturn[idx])
293    {
294      request_size += 2;
295    }
296  }
297
298  std::vector<ep_lib::MPI_Request> requestOnReturn(request_size);
299  currentIndex = 0;
300  position = 0;
301  for (int idx = 0; idx < recvRankOnReturn.size(); ++idx)
302  {
303    if (0 != recvNbIndexOnReturn[idx])
304    {
305      //recvIndexFromClients(recvRankOnReturn[idx], recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], commLevel, requestOnReturn);
306      MPI_Irecv(recvIndexBuffOnReturn+currentIndex, recvNbIndexOnReturn[idx], MPI_UNSIGNED_LONG,
307            recvRankOnReturn[idx], MPI_DHT_INDEX, commLevel, &requestOnReturn[position]);
308      position++;
309      //recvInfoFromClients(recvRankOnReturn[idx],
310      //                    recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
311      //                    recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(),
312      //                    commLevel, requestOnReturn);
313      MPI_Irecv(recvInfoBuffOnReturn+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 
314                recvNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), MPI_CHAR,
315                recvRankOnReturn[idx], MPI_DHT_INFO, commLevel, &requestOnReturn[position]);
316      position++;
317    }
318    currentIndex += recvNbIndexOnReturn[idx];
319  }
320
321  boost::unordered_map<int,unsigned char*> client2ClientInfoOnReturn;
322  boost::unordered_map<int,size_t*> client2ClientIndexOnReturn;
323  currentIndex = 0;
324  for (int idx = 0; idx < nbRecvClient; ++idx)
325  {
326    if (0 != sendNbIndexOnReturn[idx])
327    {
328      int rank = recvRankClient[idx];
329      client2ClientIndexOnReturn[rank] = new unsigned long [sendNbIndexOnReturn[idx]];
330      client2ClientInfoOnReturn[rank] = new unsigned char [sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize()];
331      unsigned char* tmpInfoPtr = client2ClientInfoOnReturn[rank];
332      int infoIndex = 0;
333      int nb = 0;
334      for (int i = 0; i < recvNbIndexClientCount[idx]; ++i)
335      {
336        itIndexToInfoMap = indexToInfoMappingLevel_.find(*(recvIndexBuff+currentIndex+i));
337        if (iteIndexToInfoMap != itIndexToInfoMap)
338        {
339          const std::vector<InfoType>& infoTmp = itIndexToInfoMap->second;
340          for (int k = 0; k < infoTmp.size(); ++k)
341          {
342            client2ClientIndexOnReturn[rank][nb] = itIndexToInfoMap->first;
343            ProcessDHTElement<InfoType>::packElement(infoTmp[k], tmpInfoPtr, infoIndex);
344            ++nb;
345          }
346        }
347      }
348
349      //sendIndexToClients(rank, client2ClientIndexOnReturn[rank],
350      //                   sendNbIndexOnReturn[idx], commLevel, requestOnReturn);
351      MPI_Isend(client2ClientIndexOnReturn[rank], sendNbIndexOnReturn[idx], MPI_UNSIGNED_LONG,
352            rank, MPI_DHT_INDEX, commLevel, &requestOnReturn[position]);
353      position++;
354      //sendInfoToClients(rank, client2ClientInfoOnReturn[rank],
355      //                  sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), commLevel, requestOnReturn);
356      MPI_Isend(client2ClientInfoOnReturn[rank], sendNbIndexOnReturn[idx]*ProcessDHTElement<InfoType>::typeSize(), MPI_CHAR,
357            rank, MPI_DHT_INFO, commLevel, &requestOnReturn[position]);
358      position++;
359    }
360    currentIndex += recvNbIndexClientCount[idx];
361  }
362
363  std::vector<ep_lib::MPI_Status> statusOnReturn(requestOnReturn.size());
364  MPI_Waitall(requestOnReturn.size(), &requestOnReturn[0], &statusOnReturn[0]);
365
366  Index2VectorInfoTypeMap indexToInfoMapping;
367  indexToInfoMapping.rehash(std::ceil(recvNbIndexCountOnReturn/indexToInfoMapping.max_load_factor()));
368  int infoIndex = 0;
369  InfoType unpackedInfo;
370  for (int idx = 0; idx < recvNbIndexCountOnReturn; ++idx)
371  {
372    ProcessDHTElement<InfoType>::unpackElement(unpackedInfo, recvInfoBuffOnReturn, infoIndex);
373    indexToInfoMapping[recvIndexBuffOnReturn[idx]].push_back(unpackedInfo);
374  }
375
376  indexToInfoMappingLevel_.swap(indexToInfoMapping);
377  if (0 != recvNbIndexCount) delete [] recvIndexBuff;
378  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
379                                                        it != client2ClientIndex.end(); ++it)
380      delete [] it->second;
381  delete tmpGlobalIndex;
382
383  if (0 != recvNbIndexCountOnReturn)
384  {
385    delete [] recvIndexBuffOnReturn;
386    delete [] recvInfoBuffOnReturn;
387  }
388
389  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfoOnReturn.begin();
390                                                               it != client2ClientInfoOnReturn.end(); ++it)
391      delete [] it->second;
392
393  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndexOnReturn.begin();
394                                            it != client2ClientIndexOnReturn.end(); ++it)
395      delete [] it->second;
396}
397
398/*!
399  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
400*/
401template<typename T, typename H>
402void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
403{
404  // Compute range of hash index for each client
405  hashedIndex.resize(nbClient+1);
406  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
407  size_t nbHashIndex;
408  hashedIndex[0] = 0;
409  for (int i = 1; i < nbClient; ++i)
410  {
411    nbHashIndex = nbHashIndexMax / nbClient;
412    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
413    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
414  }
415  hashedIndex[nbClient] = nbHashIndexMax;
416}
417
418/*!
419  Compute distribution of global index for servers
420  Each client already holds a piece of information and its associated index.
421This information will be redistributed among processes by projecting indices into size_t space,
422the corresponding information will be also distributed on size_t space.
423After the redistribution, each client holds rearranged index and its corresponding information.
424  \param [in] indexInfoMap index and its corresponding info (usually server index)
425  \param [in] commLevel communicator of current level
426  \param [in] level current level
427*/
428template<typename T, typename H>
429void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const Index2VectorInfoTypeMap& indexInfoMap,
430                                                            const ep_lib::MPI_Comm& commLevel,
431                                                            int level)
432{
433  int clientRank;
434  MPI_Comm_rank(commLevel,&clientRank);
435  computeSendRecvRank(level, clientRank);
436  ep_lib::MPI_Barrier(commLevel);
437
438  int groupRankBegin = this->getGroupBegin()[level];
439  int nbClient = this->getNbInGroup()[level];
440  std::vector<size_t> hashedIndex;
441  computeHashIndex(hashedIndex, nbClient);
442
443  std::vector<int> sendBuff(nbClient,0);
444  std::vector<int> sendNbIndexBuff(nbClient,0);
445  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
446                                      iteClientHash = hashedIndex.end();
447  typename Index2VectorInfoTypeMap::const_iterator itb = indexInfoMap.begin(),it,
448                                                   ite = indexInfoMap.end();
449  HashXIOS<size_t> hashGlobalIndex;
450
451  // Compute size of sending and receving buffer
452  for (it = itb; it != ite; ++it)
453  {
454    size_t hashIndex = hashGlobalIndex(it->first);
455    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
456    int indexClient = std::distance(itbClientHash, itClientHash)-1;
457    sendNbIndexBuff[indexClient] += it->second.size();
458  }
459
460  boost::unordered_map<int, size_t*> client2ClientIndex;
461  boost::unordered_map<int, unsigned char*> client2ClientInfo;
462  for (int idx = 0; idx < nbClient; ++idx)
463  {
464    if (0 != sendNbIndexBuff[idx])
465    {
466      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
467      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
468      sendNbIndexBuff[idx] = 0;
469      sendBuff[idx] = 1;
470    }
471  }
472
473  std::vector<int> sendNbInfo(nbClient,0);
474  for (it = itb; it != ite; ++it)
475  {
476    const std::vector<InfoType>& infoTmp = it->second;
477    size_t hashIndex = hashGlobalIndex(it->first);
478    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
479    int indexClient = std::distance(itbClientHash, itClientHash)-1;
480    for (int idx = 0; idx < infoTmp.size(); ++idx)
481    {
482      client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
483      ProcessDHTElement<InfoType>::packElement(infoTmp[idx], client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
484      ++sendNbIndexBuff[indexClient];
485    }
486  }
487
488  // Calculate from how many clients each client receive message.
489  // Calculate size of buffer for receiving message
490  std::vector<int> recvRankClient, recvNbIndexClientCount;
491  sendRecvRank(level, sendBuff, sendNbIndexBuff,
492               recvRankClient, recvNbIndexClientCount);
493
494  int recvNbIndexCount = 0;
495  for (int idx = 0; idx < recvNbIndexClientCount.size(); ++idx)
496  { 
497    recvNbIndexCount += recvNbIndexClientCount[idx];
498  }
499
500  unsigned long* recvIndexBuff;
501  unsigned char* recvInfoBuff;
502  if (0 != recvNbIndexCount)
503  {
504    recvIndexBuff = new unsigned long[recvNbIndexCount];
505    recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
506  }
507
508  // If a client holds information about index and the corresponding which don't belong to it,
509  // it will send a message to the correct clients.
510  // Contents of the message are index and its corresponding informatioin
511  int request_size = 0; 
512  int currentIndex = 0;
513  int nbRecvClient = recvRankClient.size();
514  int current_pos = 0; 
515
516  for (int idx = 0; idx < nbRecvClient; ++idx)
517  {
518    if (0 != recvNbIndexClientCount[idx])
519    {
520      request_size += 2;
521    }
522    //currentIndex += recvNbIndexClientCount[idx];
523  }
524
525  request_size += client2ClientIndex.size();
526  request_size += client2ClientInfo.size();
527
528
529
530  std::vector<ep_lib::MPI_Request> request(request_size);
531 
532  //unsigned long* tmp_send_buf_long[client2ClientIndex.size()];
533  //unsigned char* tmp_send_buf_char[client2ClientInfo.size()];
534 
535  int info_position = 0;
536  int index_position = 0;
537
538
539  boost::unordered_map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
540                                                iteIndex = client2ClientIndex.end();
541  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
542  {
543    //sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, request);
544
545    //tmp_send_buf_long[index_position] = new unsigned long[sendNbIndexBuff[itIndex->first-groupRankBegin]];
546    //for(int i=0; i<sendNbIndexBuff[itIndex->first-groupRankBegin]; i++)
547    //{
548    //  tmp_send_buf_long[index_position][i] = (static_cast<unsigned long * >(itIndex->second))[i];
549    //}
550    //MPI_Isend(tmp_send_buf_long[current_pos], sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG,
551    MPI_Isend(itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], MPI_UNSIGNED_LONG,
552              itIndex->first, MPI_DHT_INDEX, commLevel, &request[current_pos]);
553    current_pos++; 
554    index_position++;
555
556  }
557
558  boost::unordered_map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
559                                                      iteInfo = client2ClientInfo.end();
560  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
561  {
562    //sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, request);
563
564    //tmp_send_buf_char[info_position] = new unsigned char[sendNbInfo[itInfo->first-groupRankBegin]];
565    //for(int i=0; i<sendNbInfo[itInfo->first-groupRankBegin]; i++)
566    //{
567    //  tmp_send_buf_char[info_position][i] = (static_cast<unsigned char * >(itInfo->second))[i];
568    //}
569
570    MPI_Isend(itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], MPI_CHAR,
571              itInfo->first, MPI_DHT_INFO, commLevel, &request[current_pos]);
572
573    current_pos++;
574    info_position++;
575  }
576 
577  for (int idx = 0; idx < nbRecvClient; ++idx)
578  {
579    if (0 != recvNbIndexClientCount[idx])
580    {
581      //recvIndexFromClients(recvRankClient[idx], recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], commLevel, request);
582      MPI_Irecv(recvIndexBuff+currentIndex, recvNbIndexClientCount[idx], MPI_UNSIGNED_LONG,
583                recvRankClient[idx], MPI_DHT_INDEX, commLevel, &request[current_pos]);
584      current_pos++;
585     
586     
587      MPI_Irecv(recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(), 
588                recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(), 
589                MPI_CHAR, recvRankClient[idx], MPI_DHT_INFO, commLevel, &request[current_pos]);
590     
591      current_pos++;
592     
593
594
595      // recvInfoFromClients(recvRankClient[idx],
596      //                     recvInfoBuff+currentIndex*ProcessDHTElement<InfoType>::typeSize(),
597      //                     recvNbIndexClientCount[idx]*ProcessDHTElement<InfoType>::typeSize(),
598      //                     commLevel, request);
599    }
600    currentIndex += recvNbIndexClientCount[idx];
601  }
602
603  std::vector<ep_lib::MPI_Status> status(request.size());
604 
605  MPI_Waitall(request.size(), &request[0], &status[0]);
606 
607 
608  //for(int i=0; i<client2ClientInfo.size(); i++)
609  //  delete[] tmp_send_buf_char[i];
610
611 
612
613  //for(int i=0; i<client2ClientIndex.size(); i++)
614  //  delete[] tmp_send_buf_long[i];
615
616
617  Index2VectorInfoTypeMap indexToInfoMapping;
618  indexToInfoMapping.rehash(std::ceil(currentIndex/indexToInfoMapping.max_load_factor()));
619  currentIndex = 0;
620  InfoType infoValue;
621  int infoIndex = 0;
622  unsigned char* infoBuff = recvInfoBuff;
623  for (int idx = 0; idx < nbRecvClient; ++idx)
624  {
625    size_t index;
626    int count = recvNbIndexClientCount[idx];
627    for (int i = 0; i < count; ++i)
628    {
629      ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuff, infoIndex);
630      indexToInfoMapping[*(recvIndexBuff+currentIndex+i)].push_back(infoValue);
631    }
632    currentIndex += count;
633  }
634
635  if (0 != recvNbIndexCount)
636  {
637    delete [] recvIndexBuff;
638    delete [] recvInfoBuff;
639  }
640  for (boost::unordered_map<int,unsigned char*>::const_iterator it = client2ClientInfo.begin();
641                                                               it != client2ClientInfo.end(); ++it)
642      delete [] it->second;
643
644  for (boost::unordered_map<int,size_t*>::const_iterator it = client2ClientIndex.begin();
645                                                        it != client2ClientIndex.end(); ++it)
646      delete [] it->second;
647
648  // Ok, now do something recursive
649  if (0 < level)
650  {
651    --level;
652    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
653  }
654  else
655    index2InfoMapping_.swap(indexToInfoMapping);
656 
657}
658
659/*!
660  Send message containing index to clients
661  \param [in] clientDestRank rank of destination client
662  \param [in] indices index to send
663  \param [in] indiceSize size of index array to send
664  \param [in] clientIntraComm communication group of client
665  \param [in] requestSendIndex list of sending request
666*/
667template<typename T, typename H>
668void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
669                                                       const ep_lib::MPI_Comm& clientIntraComm,
670                                                       std::vector<ep_lib::MPI_Request>& requestSendIndex)
671{
672  printf("should not call this function sendIndexToClients"); 
673  ep_lib::MPI_Request request;
674  requestSendIndex.push_back(request);
675  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
676            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
677}
678
679/*!
680  Receive message containing index to clients
681  \param [in] clientDestRank rank of destination client
682  \param [in] indices index to send
683  \param [in] clientIntraComm communication group of client
684  \param [in] requestRecvIndex list of receiving request
685*/
686template<typename T, typename H>
687void CClientClientDHTTemplate<T,H>::recvIndexFromClients(int clientSrcRank, size_t* indices, size_t indiceSize,
688                                                         const ep_lib::MPI_Comm& clientIntraComm,
689                                                         std::vector<ep_lib::MPI_Request>& requestRecvIndex)
690{
691  printf("should not call this function recvIndexFromClients");
692  ep_lib::MPI_Request request;
693  requestRecvIndex.push_back(request);
694  MPI_Irecv(indices, indiceSize, MPI_UNSIGNED_LONG,
695            clientSrcRank, MPI_DHT_INDEX, clientIntraComm, &(requestRecvIndex.back()));
696}
697
698/*!
699  Send message containing information to clients
700  \param [in] clientDestRank rank of destination client
701  \param [in] info info array to send
702  \param [in] infoSize info array size to send
703  \param [in] clientIntraComm communication group of client
704  \param [in] requestSendInfo list of sending request
705*/
706template<typename T, typename H>
707void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
708                                                      const ep_lib::MPI_Comm& clientIntraComm,
709                                                      std::vector<ep_lib::MPI_Request>& requestSendInfo)
710{
711  printf("should not call this function sendInfoToClients");
712  ep_lib::MPI_Request request;
713  requestSendInfo.push_back(request);
714  MPI_Isend(info, infoSize, MPI_CHAR,
715            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
716}
717
718/*!
719  Receive message containing information from other clients
720  \param [in] clientDestRank rank of destination client
721  \param [in] info info array to receive
722  \param [in] infoSize info array size to receive
723  \param [in] clientIntraComm communication group of client
724  \param [in] requestRecvInfo list of sending request
725*/
726template<typename T, typename H>
727void CClientClientDHTTemplate<T,H>::recvInfoFromClients(int clientSrcRank, unsigned char* info, int infoSize,
728                                                        const ep_lib::MPI_Comm& clientIntraComm,
729                                                        std::vector<ep_lib::MPI_Request>& requestRecvInfo)
730{
731  printf("should not call this function recvInfoFromClients\n");
732  ep_lib::MPI_Request request;
733  requestRecvInfo.push_back(request);
734
735  MPI_Irecv(info, infoSize, MPI_CHAR,
736            clientSrcRank, MPI_DHT_INFO, clientIntraComm, &(requestRecvInfo.back()));
737}
738
739/*!
740  Compute how many processes one process needs to send to and from how many processes it will receive.
741  This computation is only based on hierachical structure of distributed hash table (e.x: number of processes)
742*/
743template<typename T, typename H>
744void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
745{
746  int groupBegin = this->getGroupBegin()[level];
747  int nbInGroup  = this->getNbInGroup()[level];
748  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
749  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
750
751  std::vector<size_t> hashedIndexGroup;
752  computeHashIndex(hashedIndexGroup, nbInGroup);
753  size_t a = hashedIndexGroup[rank-groupBegin];
754  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
755
756  int currentGroup, offset;
757  size_t e,f;
758
759  // Do a simple math [a,b) intersect [c,d)
760  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
761  {
762    std::vector<size_t> hashedIndexGroupParent;
763    int nbInGroupParent = nbInGroupParents[idx];
764    if (0 != nbInGroupParent)
765      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
766    for (int i = 0; i < nbInGroupParent; ++i)
767    {
768      size_t c = hashedIndexGroupParent[i];
769      size_t d = hashedIndexGroupParent[i+1]-1;
770
771    if (!((d < a) || (b <c)))
772        recvRank_[level].push_back(groupParentBegin[idx]+i);
773    }
774
775    offset = rank - groupParentBegin[idx];
776    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
777    {
778      e = hashedIndexGroupParent[offset];
779      f = hashedIndexGroupParent[offset+1]-1;
780    }
781  }
782
783  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
784                                      iteHashGroup = hashedIndexGroup.end();
785  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
786  int begin = std::distance(itbHashGroup, itHashGroup)-1;
787  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
788  int end = std::distance(itbHashGroup, itHashGroup) -1;
789  sendRank_[level].resize(end-begin+1);
790  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
791}
792
793/*!
794  Compute number of clients as well as corresponding number of elements each client will receive on returning searching result
795  \param [in] sendNbRank Rank of clients to send to
796  \param [in] sendNbElements Number of elements each client to send to
797  \param [in] receiveNbRank Rank of clients to receive from
798  \param [out] recvNbElements Number of elements each client to send to
799*/
800template<typename T, typename H>
801void CClientClientDHTTemplate<T,H>::sendRecvOnReturn(const std::vector<int>& sendNbRank, std::vector<int>& sendNbElements,
802                                                     const std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
803{
804  recvNbElements.resize(recvNbRank.size());
805  std::vector<ep_lib::MPI_Request> request(sendNbRank.size()+recvNbRank.size());
806  std::vector<ep_lib::MPI_Status> requestStatus(sendNbRank.size()+recvNbRank.size());
807
808  int nRequest = 0;
809 
810
811  for (int idx = 0; idx < sendNbRank.size(); ++idx)
812  {
813    MPI_Isend(&sendNbElements[0]+idx, 1, MPI_INT,
814              sendNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
815    ++nRequest;
816  }
817 
818  for (int idx = 0; idx < recvNbRank.size(); ++idx)
819  {
820    MPI_Irecv(&recvNbElements[0]+idx, 1, MPI_INT,
821              recvNbRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[nRequest]);
822    ++nRequest;
823  }
824
825  MPI_Waitall(sendNbRank.size()+recvNbRank.size(), &request[0], &requestStatus[0]);
826}
827
828/*!
829  Send and receive number of process each process need to listen to as well as number
830  of index it will receive during the initalization phase
831  \param [in] level current level
832  \param [in] sendNbRank Rank of clients to send to
833  \param [in] sendNbElements Number of elements each client to send to
834  \param [out] receiveNbRank Rank of clients to receive from
835  \param [out] recvNbElements Number of elements each client to send to
836*/
837template<typename T, typename H>
838void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
839                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
840                                                 std::vector<int>& recvNbRank, std::vector<int>& recvNbElements)
841{
842  int groupBegin = this->getGroupBegin()[level];
843
844  int offSet = 0;
845  std::vector<int>& sendRank = sendRank_[level];
846  std::vector<int>& recvRank = recvRank_[level];
847  int sendBuffSize = sendRank.size();
848  std::vector<int> sendBuff(sendBuffSize*2);
849  int recvBuffSize = recvRank.size();
850  std::vector<int> recvBuff(recvBuffSize*2,0);
851
852  std::vector<ep_lib::MPI_Request> request(sendBuffSize+recvBuffSize);
853  std::vector<ep_lib::MPI_Status> requestStatus(sendBuffSize+recvBuffSize);
854  //ep_lib::MPI_Request request[sendBuffSize+recvBuffSize];
855  //ep_lib::MPI_Status requestStatus[sendBuffSize+recvBuffSize];
856 
857  int my_rank;
858  MPI_Comm_rank(this->internalComm_, &my_rank);
859 
860  int nRequest = 0;
861  for (int idx = 0; idx < recvBuffSize; ++idx)
862  {
863    MPI_Irecv(&recvBuff[2*idx], 2, MPI_INT,
864              recvRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
865    ++nRequest;
866  }
867 
868
869  for (int idx = 0; idx < sendBuffSize; ++idx)
870  {
871    offSet = sendRank[idx]-groupBegin;
872    sendBuff[idx*2] = sendNbRank[offSet];
873    sendBuff[idx*2+1] = sendNbElements[offSet];
874  }
875 
876 
877
878  for (int idx = 0; idx < sendBuffSize; ++idx)
879  {
880    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
881              sendRank[idx], MPI_DHT_INDEX_0, this->internalComm_, &request[nRequest]);
882    ++nRequest;
883  }
884 
885 
886
887  //MPI_Barrier(this->internalComm_);
888
889  MPI_Waitall(sendBuffSize+recvBuffSize, &request[0], &requestStatus[0]);
890  //MPI_Waitall(sendBuffSize+recvBuffSize, request, requestStatus);
891
892 
893  int nbRecvRank = 0, nbRecvElements = 0;
894  recvNbRank.clear();
895  recvNbElements.clear();
896  for (int idx = 0; idx < recvBuffSize; ++idx)
897  {
898    if (0 != recvBuff[2*idx])
899    {
900      recvNbRank.push_back(recvRank[idx]);
901      recvNbElements.push_back(recvBuff[2*idx+1]);
902    }
903  }
904
905
906 
907 
908}
909
910}
911
Note: See TracBrowser for help on using the repository browser.