source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 833

Last change on this file since 833 was 833, checked in by mhnguyen, 9 years ago

Improvements for dht

+) Implement adaptive hierarchy for dht, level of hierarchy depends on number of processes
+) Remove some redundant codes

Test
+) On Curie
+) All tests are correct

File size: 29.5 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm,
26                                                        int hierarLvl)
27  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_()
28{
29  this->computeMPICommLevel();
30  int nbLvl = this->getNbLevel();
31  sendRank_.resize(nbLvl);
32  recvRank_.resize(nbLvl);
33  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
34}
35
36template<typename T, typename H>
37CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
38{
39}
40
41/*!
42  Compute mapping between indices and information corresponding to these indices
43  \param [in] indices indices a proc has
44*/
45template<typename T, typename H>
46void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
47{
48  int nbLvl = this->getNbLevel();
49  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
50}
51
52/*!
53    Compute mapping between indices and information corresponding to these indices
54for each level of hierarchical DHT. Recursive function
55   \param [in] indices indices a proc has
56   \param [in] commLevel communicator of current level
57   \param [in] level current level
58*/
59template<typename T, typename H>
60void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
61                                                                 const MPI_Comm& commLevel,
62                                                                 int level)
63{
64  int clientRank;
65  MPI_Comm_rank(commLevel,&clientRank);
66  int groupRankBegin = this->getGroupBegin()[level];
67  int nbClient = this->getNbInGroup()[level];
68  std::vector<size_t> hashedIndex;
69  computeHashIndex(hashedIndex, nbClient);
70
71  size_t ssize = indices.numElements(), hashedVal;
72
73  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
74                                      iteClientHash = hashedIndex.end();
75  std::vector<int> sendBuff(nbClient,0);
76  std::vector<int> sendNbIndexBuff(nbClient,0);
77
78  // Number of global index whose mapping server are on other clients
79  int nbIndexToSend = 0;
80  size_t index;
81  HashXIOS<size_t> hashGlobalIndex;
82  for (int i = 0; i < ssize; ++i)
83  {
84    index = indices(i);
85    hashedVal  = hashGlobalIndex(index);
86    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
87    int indexClient = std::distance(itbClientHash, itClientHash)-1;
88    ++sendNbIndexBuff[indexClient];
89  }
90
91  std::map<int, size_t* > client2ClientIndex;
92  for (int idx = 0; idx < nbClient; ++idx)
93  {
94    if (0 != sendNbIndexBuff[idx])
95    {
96      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
97      nbIndexToSend += sendNbIndexBuff[idx];
98      sendBuff[idx] = 1;
99      sendNbIndexBuff[idx] = 0;
100    }
101  }
102
103  for (int i = 0; i < ssize; ++i)
104  {
105    index = indices(i);
106    hashedVal  = hashGlobalIndex(index);
107    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
108    {
109      int indexClient = std::distance(itbClientHash, itClientHash)-1;
110      {
111        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;;
112        ++sendNbIndexBuff[indexClient];
113      }
114    }
115  }
116
117  int recvNbClient, recvNbIndexCount;
118  sendRecvRank(level, sendBuff, sendNbIndexBuff,
119               recvNbClient, recvNbIndexCount);
120
121  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
122                                                iteIndex = client2ClientIndex.end();
123
124  std::list<MPI_Request> sendIndexRequest;
125  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
126     sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendIndexRequest);
127
128  int nbDemandingClient = recvNbClient; //recvBuff[clientRank],
129  int nbSendBuffInfoReceived = 0;
130
131  // Receiving demand as well as the responds from other clients
132  // The demand message contains global index; meanwhile the responds have server index information
133  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
134  // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
135  unsigned long* recvBuffIndex = 0;
136  int maxNbIndexDemandedFromOthers = recvNbIndexCount;
137  if (0 != maxNbIndexDemandedFromOthers)
138    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
139
140  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
141  unsigned char* recvBuffInfo = 0;
142  int nbIndexReceivedFromOthers = nbIndexToSend;
143  if (0 != nbIndexReceivedFromOthers)
144    recvBuffInfo = new unsigned char[nbIndexReceivedFromOthers*ProcessDHTElement<InfoType>::typeSize()];
145
146  std::map<int, MPI_Request>::iterator itRequest;
147  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
148
149  int countIndex = 0;  // Counting of buffer for receiving index
150  std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index
151
152  // Mapping client rank and the beginning position of receiving buffer for message of index from this client
153  std::map<int, unsigned long*> indexBuffBegin;
154
155  std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
156
157  CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
158
159  int k = 0;
160  while ((0 < nbDemandingClient) || (!sendIndexRequest.empty()))
161  {
162    // Just check whether a client has any demand from other clients.
163    // If it has, then it should send responds to these client(s)
164    probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
165                                 countIndex, indexBuffBegin,
166                                 requestRecvIndex, commLevel);
167    if (0 < nbDemandingClient)
168    {
169      for (itRequest = requestRecvIndex.begin();
170           itRequest != requestRecvIndex.end(); ++itRequest)
171      {
172        int flagIndexGlobal, count;
173        MPI_Status statusIndexGlobal;
174
175        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
176        if (true == flagIndexGlobal)
177        {
178          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
179          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
180          unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
181          for (int i = 0; i < count; ++i)
182          {
183            src2Index[clientSourceRank].push_back(*(beginBuff+i));
184            tmpGlobalIndexOnClient(k) = *(beginBuff+i);
185            ++k;
186          }
187          --nbDemandingClient;
188
189          demandAlreadyReceived.push_back(clientSourceRank);
190        }
191      }
192      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
193        requestRecvIndex.erase(demandAlreadyReceived[i]);
194    }
195
196    testSendRequest(sendIndexRequest);
197  }
198
199  if (0 < level)
200  {
201    --level;
202    computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->internalComm_, level);
203  }
204  else
205    indexToInfoMappingLevel_ = index2InfoMapping_;
206
207  std::map<int, std::vector<InfoType> > client2ClientInfo;
208  std::vector<unsigned char*> infoToSend(src2Index.size());
209  std::list<MPI_Request> sendInfoRequest;
210  std::map<int, std::vector<size_t> >::iterator itSrc2Idx = src2Index.begin(),
211                                                iteSrc2Idx = src2Index.end();
212  for (int i=0; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx, ++i)
213  {
214    int clientSourceRank = itSrc2Idx->first;
215    std::vector<size_t>& srcIdx = itSrc2Idx->second;
216    infoToSend[i] = new unsigned char [srcIdx.size()*ProcessDHTElement<InfoType>::typeSize()];
217    int infoIndex = 0;
218    for (int idx = 0; idx < srcIdx.size(); ++idx)
219    {
220      ProcessDHTElement<InfoType>::packElement(indexToInfoMappingLevel_[srcIdx[idx]], infoToSend[i], infoIndex);
221    }
222    sendInfoToClients(clientSourceRank, infoToSend[i], infoIndex, commLevel, sendInfoRequest);
223  }
224
225  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
226  int countInfo = 0; // Counting of buffer for receiving server index
227  std::map<int, MPI_Request> requestRecvInfo;
228
229  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
230  std::map<int, unsigned char*> infoBuffBegin;
231
232  while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers))
233  {
234    testSendRequest(sendInfoRequest);
235
236    // In some cases, a client need to listen respond from other clients about server information
237    // Ok, with the information, a client can fill in its server-global index map.
238    probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
239                                countInfo, infoBuffBegin,
240                                requestRecvInfo, commLevel);
241    for (itRequest = requestRecvInfo.begin();
242         itRequest != requestRecvInfo.end();
243         ++itRequest)
244    {
245      int flagInfo, count;
246      MPI_Status statusInfo;
247
248      MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
249      if (true == flagInfo)
250      {
251        MPI_Get_count(&statusInfo, MPI_CHAR, &count);
252        int actualCountInfo = count/infoTypeSize;
253        int clientSourceRank = statusInfo.MPI_SOURCE;
254        unsigned char* beginBuff = infoBuffBegin[clientSourceRank];
255        size_t* indexTmp = client2ClientIndex[clientSourceRank];
256        int infoIndex = 0;
257        for (int i = 0; i < actualCountInfo; ++i)
258        {
259          ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[indexTmp[i]], beginBuff, infoIndex);
260        }
261        nbSendBuffInfoReceived += actualCountInfo;
262        repondAlreadyReceived.push_back(clientSourceRank);
263      }
264    }
265
266    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
267      requestRecvInfo.erase(repondAlreadyReceived[i]);
268    repondAlreadyReceived.resize(0);
269  }
270
271  indexToInfoMappingLevel_.swap(indexToInfoMapping);
272  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
273  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
274  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second;
275  for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx];
276}
277
278/*!
279  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
280*/
281template<typename T, typename H>
282void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
283{
284  // Compute range of hash index for each client
285  hashedIndex.resize(nbClient+1);
286  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
287  size_t nbHashIndex;
288  hashedIndex[0] = 0;
289  for (int i = 1; i < nbClient; ++i)
290  {
291    nbHashIndex = nbHashIndexMax / nbClient;
292    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
293    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
294  }
295  hashedIndex[nbClient] = nbHashIndexMax;
296}
297
298/*!
299  Compute distribution of global index for servers
300  Each client already holds a piece of information and its attached index.
301This information will be redistributed among processes by projecting indices into size_t space.
302After the redistribution, each client holds rearranged index and its corresponding information.
303  \param [in] indexInfoMap index and its corresponding info (usually server index)
304  \param [in] commLevel communicator of current level
305  \param [in] level current level
306*/
307template<typename T, typename H>
308void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
309                                                            const MPI_Comm& commLevel,
310                                                            int level)
311{
312  int clientRank;
313  MPI_Comm_rank(commLevel,&clientRank);
314  computeSendRecvRank(level, clientRank);
315
316  int groupRankBegin = this->getGroupBegin()[level];
317  int nbClient = this->getNbInGroup()[level];
318  std::vector<size_t> hashedIndex;
319  computeHashIndex(hashedIndex, nbClient);
320
321  std::vector<int> sendBuff(nbClient,0);
322  std::vector<int> sendNbIndexBuff(nbClient,0);
323  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
324                                      iteClientHash = hashedIndex.end();
325  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it,
326                                                                 ite = indexInfoMap.end();
327  HashXIOS<size_t> hashGlobalIndex;
328
329  // Compute size of sending and receving buffer
330  for (it = itb; it != ite; ++it)
331  {
332    size_t hashIndex = hashGlobalIndex(it->first);
333    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
334    {
335      int indexClient = std::distance(itbClientHash, itClientHash)-1;
336      {
337        ++sendNbIndexBuff[indexClient];
338      }
339    }
340  }
341
342  std::map<int, size_t*> client2ClientIndex;
343  std::map<int, unsigned char*> client2ClientInfo;
344  for (int idx = 0; idx < nbClient; ++idx)
345  {
346    if (0 != sendNbIndexBuff[idx])
347    {
348      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
349      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
350      sendNbIndexBuff[idx] = 0;
351      sendBuff[idx] = 1;
352    }
353  }
354
355  std::vector<int> sendNbInfo(nbClient,0);
356  for (it = itb; it != ite; ++it)
357  {
358    size_t hashIndex = hashGlobalIndex(it->first);
359    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
360    {
361      int indexClient = std::distance(itbClientHash, itClientHash)-1;
362      {
363        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
364        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
365        ++sendNbIndexBuff[indexClient];
366      }
367    }
368  }
369
370  // Calculate from how many clients each client receive message.
371  // Calculate size of buffer for receiving message
372  int recvNbClient, recvNbIndexCount;
373  sendRecvRank(level, sendBuff, sendNbIndexBuff,
374               recvNbClient, recvNbIndexCount);
375
376  // If a client holds information about index and the corresponding which don't belong to it,
377  // it will send a message to the correct clients.
378  // Contents of the message are index and its corresponding informatioin
379  std::list<MPI_Request> sendRequest;
380  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
381                                    iteIndex = client2ClientIndex.end();
382  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
383    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendRequest);
384  std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
385                                          iteInfo = client2ClientInfo.end();
386  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
387    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, sendRequest);
388
389
390  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
391  unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
392
393  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo;
394  std::map<int, int> countBuffInfo, countBuffIndex;
395  std::vector<int> processedList;
396
397  bool isFinished = (0 == recvNbClient) ? true : false;
398
399  // Counting of buffer for receiving global index
400  int countIndex = 0;
401
402  // Counting of buffer for receiving server index
403  int countInfo = 0;
404
405  // Request returned by MPI_IRecv function about global index
406  std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo;
407
408  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
409  std::map<int, unsigned long*> indexBuffBegin;
410
411  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
412  std::map<int, unsigned char*> infoBuffBegin;
413
414  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
415
416  // Now each client trys to listen to demand from others.
417  // If they have message, it processes: pushing global index and corresponding server to its map
418  while (!isFinished || (!sendRequest.empty()))
419  {
420    testSendRequest(sendRequest);
421    probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
422                                 countIndex, indexBuffBegin,
423                                 requestRecvIndex, commLevel);
424    // Processing complete request
425    for (itRequestIndex = requestRecvIndex.begin();
426         itRequestIndex != requestRecvIndex.end();
427         ++itRequestIndex)
428    {
429      int rank = itRequestIndex->first;
430      int count = computeBuffCountIndex(itRequestIndex->second);
431      if (0 != count)
432        countBuffIndex[rank] = count;
433    }
434
435    probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
436                                countInfo, infoBuffBegin,
437                                requestRecvInfo, commLevel);
438    for (itRequestInfo = requestRecvInfo.begin();
439         itRequestInfo != requestRecvInfo.end();
440         ++itRequestInfo)
441    {
442      int rank = itRequestInfo->first;
443      int count = computeBuffCountInfo(itRequestInfo->second);
444      if (0 != count)
445        countBuffInfo[rank] = count;
446    }
447
448    for (std::map<int, int>::iterator it = countBuffIndex.begin();
449                                      it != countBuffIndex.end(); ++it)
450    {
451      int rank = it->first;
452      if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
453          (countBuffIndex.end() != countBuffIndex.find(rank)))
454      {
455        int count = it->second;
456        InfoType infoValue;
457        int infoIndex = 0;
458        for (int i = 0; i < count; ++i)
459        {
460          ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuffBegin[rank], infoIndex);
461          indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),infoValue));
462        }
463
464        processedList.push_back(rank);
465        --recvNbClient;
466      }
467    }
468
469    for (int i = 0; i < processedList.size(); ++i)
470    {
471      requestRecvInfo.erase(processedList[i]);
472      requestRecvIndex.erase(processedList[i]);
473      countBuffIndex.erase(processedList[i]);
474      countBuffInfo.erase(processedList[i]);
475    }
476
477    if (0 == recvNbClient) isFinished = true;
478  }
479
480  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second;
481  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) delete [] itInfo->second;
482  delete [] recvIndexBuff;
483  delete [] recvInfoBuff;
484
485  // Ok, now do something recursive
486  if (0 < level)
487  {
488    --level;
489    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
490  }
491  else
492    index2InfoMapping_ = indexToInfoMapping;
493}
494
495/*!
496  Probe and receive message containg global index from other clients.
497  Each client can send a message of global index to other clients to fulfill their maps.
498Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
499  \param [in] recvIndexBuff buffer dedicated for receiving global index
500  \param [in] recvNbIndexCount size of the buffer
501  \param [in] countIndex number of received index
502  \param [in] indexBuffBegin beginning of index buffer for each source rank
503  \param [in] requestRecvIndex request of receving index
504  \param [in] intraComm communicator
505*/
506template<typename T, typename H>
507void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
508                                                                 const int recvNbIndexCount,
509                                                                 int& countIndex,
510                                                                 std::map<int, unsigned long*>& indexBuffBegin,
511                                                                 std::map<int, MPI_Request>& requestRecvIndex,
512                                                                 const MPI_Comm& intraComm)
513{
514  MPI_Status statusIndexGlobal;
515  int flagIndexGlobal, count;
516
517  // Probing for global index
518  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
519  if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
520  {
521    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
522    indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
523    MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
524              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
525              &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
526    countIndex += count;
527  }
528}
529
530/*!
531  Probe and receive message containg server index from other clients.
532  Each client can send a message of server index to other clients to fulfill their maps.
533Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
534  \param [in] recvInfoBuff buffer dedicated for receiving server index
535  \param [in] recvNbIndexCount size of the buffer
536  \param [in] countInfo number of received info
537  \param [in] infoBuffBegin beginning of index buffer for each source rank
538  \param [in] requestRecvInfo request of receving index
539  \param [in] intraComm communicator
540*/
541template<typename T, typename H>
542void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(unsigned char* recvInfoBuff,
543                                                                const int recvNbIndexCount,
544                                                                int& countInfo,
545                                                                std::map<int, unsigned char*>& infoBuffBegin,
546                                                                std::map<int, MPI_Request>& requestRecvInfo,
547                                                                const MPI_Comm& intraComm)
548{
549  MPI_Status statusInfo;
550  int flagInfo, count;
551
552  // Probing for server index
553  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
554  if ((true == flagInfo) && (countInfo < recvNbIndexCount))
555  {
556    MPI_Get_count(&statusInfo, MPI_CHAR, &count);
557    unsigned char* beginInfoBuff = recvInfoBuff+countInfo*infoTypeSize;
558    infoBuffBegin.insert(std::make_pair<int, unsigned char*>(statusInfo.MPI_SOURCE, beginInfoBuff));
559    MPI_Irecv(beginInfoBuff, count, MPI_CHAR,
560              statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
561              &requestRecvInfo[statusInfo.MPI_SOURCE]);
562
563    countInfo += count/infoTypeSize;
564  }
565}
566
567/*!
568  Send message containing index to clients
569  \param [in] clientDestRank rank of destination client
570  \param [in] indices index to send
571  \param [in] clientIntraComm communication group of client
572  \param [in] requestSendIndex list of sending request
573*/
574template<typename T, typename H>
575void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
576                                                       const MPI_Comm& clientIntraComm,
577                                                       std::list<MPI_Request>& requestSendIndex)
578{
579  MPI_Request request;
580  requestSendIndex.push_back(request);
581  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
582            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
583}
584
585/*!
586  Send message containing information to clients
587  \param [in] clientDestRank rank of destination client
588  \param [in] info server index to send
589  \param [in] clientIntraComm communication group of client
590  \param [in] requestSendInfo list of sending request
591*/
592template<typename T, typename H>
593void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
594                       const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendInfo)
595{
596  MPI_Request request;
597  requestSendInfo.push_back(request);
598
599  MPI_Isend(info, infoSize, MPI_CHAR,
600            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
601}
602
603/*!
604  Verify status of sending request
605  \param [in] sendRequest sending request to verify
606*/
607template<typename T, typename H>
608void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest)
609{
610  int flag = 0;
611  MPI_Status status;
612  std::list<MPI_Request>::iterator itRequest;
613  int sizeListRequest = sendRequest.size();
614  int idx = 0;
615  while (idx < sizeListRequest)
616  {
617    bool isErased = false;
618    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
619    {
620      MPI_Test(&(*itRequest), &flag, &status);
621      if (true == flag)
622      {
623        isErased = true;
624        break;
625      }
626    }
627    if (true == isErased) sendRequest.erase(itRequest);
628    ++idx;
629  }
630}
631
632/*!
633  Compute size of message containing global index
634  \param[in] requestRecv request of message
635*/
636template<typename T, typename H>
637int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv)
638{
639  int flag, count = 0;
640  MPI_Status status;
641
642  MPI_Test(&requestRecv, &flag, &status);
643  if (true == flag)
644  {
645    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
646  }
647
648  return count;
649}
650
651/*!
652  Compute size of message containing server index
653  \param[in] requestRecv request of message
654*/
655template<typename T, typename H>
656int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv)
657{
658  int flag, count = 0;
659  MPI_Status status;
660
661  MPI_Test(&requestRecv, &flag, &status);
662  if (true == flag)
663  {
664    MPI_Get_count(&status, MPI_CHAR, &count);
665  }
666
667  return (count/infoTypeSize);
668}
669
670/*!
671  Compute how many processes one process needs to send to and from how many processes it will receive
672*/
673template<typename T, typename H>
674void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
675{
676  int groupBegin = this->getGroupBegin()[level];
677  int nbInGroup  = this->getNbInGroup()[level];
678  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
679  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
680
681  std::vector<size_t> hashedIndexGroup;
682  computeHashIndex(hashedIndexGroup, nbInGroup);
683  size_t a = hashedIndexGroup[rank-groupBegin];
684  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
685
686  int currentGroup, offset;
687  size_t e,f;
688
689  // Do a simple math [a,b) intersect [c,d)
690  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
691  {
692    std::vector<size_t> hashedIndexGroupParent;
693    int nbInGroupParent = nbInGroupParents[idx];
694    if (0 != nbInGroupParent)
695      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
696    for (int i = 0; i < nbInGroupParent; ++i)
697    {
698      size_t c = hashedIndexGroupParent[i];
699      size_t d = hashedIndexGroupParent[i+1]-1;
700
701    if (!((d < a) || (b <c)))
702        recvRank_[level].push_back(groupParentBegin[idx]+i);
703    }
704
705    offset = rank - groupParentBegin[idx];
706    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
707    {
708      e = hashedIndexGroupParent[offset];
709      f = hashedIndexGroupParent[offset+1]-1;
710    }
711  }
712
713  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
714                                      iteHashGroup = hashedIndexGroup.end();
715  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
716  int begin = std::distance(itbHashGroup, itHashGroup)-1;
717  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
718  int end = std::distance(itbHashGroup, itHashGroup) -1;
719  sendRank_[level].resize(end-begin+1);
720  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
721}
722
723/*!
724  Send and receive number of process each process need to listen to as well as number
725  of index it will receive
726*/
727template<typename T, typename H>
728void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
729                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
730                                                 int& recvNbRank, int& recvNbElements)
731{
732  int groupBegin = this->getGroupBegin()[level];
733
734  int offSet = 0;
735  std::vector<int>& sendRank = sendRank_[level];
736  std::vector<int>& recvRank = recvRank_[level];
737  int sendBuffSize = sendRank.size();
738  int* sendBuff = new int [sendBuffSize*2];
739  std::vector<MPI_Request> request(sendBuffSize);
740  std::vector<MPI_Status> requestStatus(sendBuffSize);
741  int recvBuffSize = recvRank.size();
742  int* recvBuff = new int [2];
743
744  for (int idx = 0; idx < sendBuffSize; ++idx)
745  {
746    offSet = sendRank[idx]-groupBegin;
747    sendBuff[idx*2] = sendNbRank[offSet];
748    sendBuff[idx*2+1] = sendNbElements[offSet];
749  }
750
751  for (int idx = 0; idx < sendBuffSize; ++idx)
752  {
753    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
754              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[idx]);
755  }
756
757  MPI_Status status;
758  int nbRecvRank = 0, nbRecvElements = 0;
759  for (int idx = 0; idx < recvBuffSize; ++idx)
760  {
761    MPI_Recv(recvBuff, 2, MPI_INT,
762             recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &status);
763    nbRecvRank += *(recvBuff);
764    nbRecvElements += *(recvBuff+1);
765  }
766
767  MPI_Waitall(sendBuffSize, &request[0], &requestStatus[0]);
768
769  recvNbRank = nbRecvRank;
770  recvNbElements = nbRecvElements;
771
772  delete [] sendBuff;
773  delete [] recvBuff;
774}
775
776}
Note: See TracBrowser for help on using the repository browser.