source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 835

Last change on this file since 835 was 835, checked in by mhnguyen, 8 years ago

Exposing some functions to Fortran interface

+) Allow add axis and domain into grid with Fortran interface
+) Remove some redundant code

Test
+) On Curie
+) test_client passes

File size: 29.5 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 23 Mars 2016
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm)
26  : H(clientIntraComm), index2InfoMapping_(), indexToInfoMappingLevel_()
27{
28  this->computeMPICommLevel();
29  int nbLvl = this->getNbLevel();
30  sendRank_.resize(nbLvl);
31  recvRank_.resize(nbLvl);
32  computeDistributedIndex(indexInfoMap, clientIntraComm, nbLvl-1);
33}
34
35template<typename T, typename H>
36CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
37{
38}
39
40/*!
41  Compute mapping between indices and information corresponding to these indices
42  \param [in] indices indices a proc has
43*/
44template<typename T, typename H>
45void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
46{
47  int nbLvl = this->getNbLevel();
48  computeIndexInfoMappingLevel(indices, this->internalComm_, nbLvl-1);
49}
50
51/*!
52    Compute mapping between indices and information corresponding to these indices
53for each level of hierarchical DHT. Recursive function
54   \param [in] indices indices a proc has
55   \param [in] commLevel communicator of current level
56   \param [in] level current level
57*/
58template<typename T, typename H>
59void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
60                                                                 const MPI_Comm& commLevel,
61                                                                 int level)
62{
63  int clientRank;
64  MPI_Comm_rank(commLevel,&clientRank);
65  int groupRankBegin = this->getGroupBegin()[level];
66  int nbClient = this->getNbInGroup()[level];
67  std::vector<size_t> hashedIndex;
68  computeHashIndex(hashedIndex, nbClient);
69
70  size_t ssize = indices.numElements(), hashedVal;
71
72  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
73                                      iteClientHash = hashedIndex.end();
74  std::vector<int> sendBuff(nbClient,0);
75  std::vector<int> sendNbIndexBuff(nbClient,0);
76
77  // Number of global index whose mapping server are on other clients
78  int nbIndexToSend = 0;
79  size_t index;
80  HashXIOS<size_t> hashGlobalIndex;
81  for (int i = 0; i < ssize; ++i)
82  {
83    index = indices(i);
84    hashedVal  = hashGlobalIndex(index);
85    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
86    int indexClient = std::distance(itbClientHash, itClientHash)-1;
87    ++sendNbIndexBuff[indexClient];
88  }
89
90  std::map<int, size_t* > client2ClientIndex;
91  for (int idx = 0; idx < nbClient; ++idx)
92  {
93    if (0 != sendNbIndexBuff[idx])
94    {
95      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
96      nbIndexToSend += sendNbIndexBuff[idx];
97      sendBuff[idx] = 1;
98      sendNbIndexBuff[idx] = 0;
99    }
100  }
101
102  for (int i = 0; i < ssize; ++i)
103  {
104    index = indices(i);
105    hashedVal  = hashGlobalIndex(index);
106    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
107    {
108      int indexClient = std::distance(itbClientHash, itClientHash)-1;
109      {
110        client2ClientIndex[indexClient+groupRankBegin][sendNbIndexBuff[indexClient]] = index;;
111        ++sendNbIndexBuff[indexClient];
112      }
113    }
114  }
115
116  int recvNbClient, recvNbIndexCount;
117  sendRecvRank(level, sendBuff, sendNbIndexBuff,
118               recvNbClient, recvNbIndexCount);
119
120  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
121                                                iteIndex = client2ClientIndex.end();
122
123  std::list<MPI_Request> sendIndexRequest;
124  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
125     sendIndexToClients(itIndex->first, (itIndex->second), sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendIndexRequest);
126
127  int nbDemandingClient = recvNbClient; //recvBuff[clientRank],
128  int nbSendBuffInfoReceived = 0;
129
130  // Receiving demand as well as the responds from other clients
131  // The demand message contains global index; meanwhile the responds have server index information
132  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
133  // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
134  unsigned long* recvBuffIndex = 0;
135  int maxNbIndexDemandedFromOthers = recvNbIndexCount;
136  if (0 != maxNbIndexDemandedFromOthers)
137    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
138
139  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
140  unsigned char* recvBuffInfo = 0;
141  int nbIndexReceivedFromOthers = nbIndexToSend;
142  if (0 != nbIndexReceivedFromOthers)
143    recvBuffInfo = new unsigned char[nbIndexReceivedFromOthers*ProcessDHTElement<InfoType>::typeSize()];
144
145  std::map<int, MPI_Request>::iterator itRequest;
146  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
147
148  int countIndex = 0;  // Counting of buffer for receiving index
149  std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index
150
151  // Mapping client rank and the beginning position of receiving buffer for message of index from this client
152  std::map<int, unsigned long*> indexBuffBegin;
153
154  std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
155
156  CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
157
158  int k = 0;
159  while ((0 < nbDemandingClient) || (!sendIndexRequest.empty()))
160  {
161    // Just check whether a client has any demand from other clients.
162    // If it has, then it should send responds to these client(s)
163    probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
164                                 countIndex, indexBuffBegin,
165                                 requestRecvIndex, commLevel);
166    if (0 < nbDemandingClient)
167    {
168      for (itRequest = requestRecvIndex.begin();
169           itRequest != requestRecvIndex.end(); ++itRequest)
170      {
171        int flagIndexGlobal, count;
172        MPI_Status statusIndexGlobal;
173
174        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
175        if (true == flagIndexGlobal)
176        {
177          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
178          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
179          unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
180          for (int i = 0; i < count; ++i)
181          {
182            src2Index[clientSourceRank].push_back(*(beginBuff+i));
183            tmpGlobalIndexOnClient(k) = *(beginBuff+i);
184            ++k;
185          }
186          --nbDemandingClient;
187
188          demandAlreadyReceived.push_back(clientSourceRank);
189        }
190      }
191      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
192        requestRecvIndex.erase(demandAlreadyReceived[i]);
193    }
194
195    testSendRequest(sendIndexRequest);
196  }
197
198  if (0 < level)
199  {
200    --level;
201    computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->internalComm_, level);
202  }
203  else
204    indexToInfoMappingLevel_ = index2InfoMapping_;
205
206  std::map<int, std::vector<InfoType> > client2ClientInfo;
207  std::vector<unsigned char*> infoToSend(src2Index.size());
208  std::list<MPI_Request> sendInfoRequest;
209  std::map<int, std::vector<size_t> >::iterator itSrc2Idx = src2Index.begin(),
210                                                iteSrc2Idx = src2Index.end();
211  for (int i=0; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx, ++i)
212  {
213    int clientSourceRank = itSrc2Idx->first;
214    std::vector<size_t>& srcIdx = itSrc2Idx->second;
215    infoToSend[i] = new unsigned char [srcIdx.size()*ProcessDHTElement<InfoType>::typeSize()];
216    int infoIndex = 0;
217    for (int idx = 0; idx < srcIdx.size(); ++idx)
218    {
219      ProcessDHTElement<InfoType>::packElement(indexToInfoMappingLevel_[srcIdx[idx]], infoToSend[i], infoIndex);
220    }
221    sendInfoToClients(clientSourceRank, infoToSend[i], infoIndex, commLevel, sendInfoRequest);
222  }
223
224  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
225  int countInfo = 0; // Counting of buffer for receiving server index
226  std::map<int, MPI_Request> requestRecvInfo;
227
228  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
229  std::map<int, unsigned char*> infoBuffBegin;
230
231  while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers))
232  {
233    testSendRequest(sendInfoRequest);
234
235    // In some cases, a client need to listen respond from other clients about server information
236    // Ok, with the information, a client can fill in its server-global index map.
237    probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
238                                countInfo, infoBuffBegin,
239                                requestRecvInfo, commLevel);
240    for (itRequest = requestRecvInfo.begin();
241         itRequest != requestRecvInfo.end();
242         ++itRequest)
243    {
244      int flagInfo, count;
245      MPI_Status statusInfo;
246
247      MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
248      if (true == flagInfo)
249      {
250        MPI_Get_count(&statusInfo, MPI_CHAR, &count);
251        int actualCountInfo = count/infoTypeSize;
252        int clientSourceRank = statusInfo.MPI_SOURCE;
253        unsigned char* beginBuff = infoBuffBegin[clientSourceRank];
254        size_t* indexTmp = client2ClientIndex[clientSourceRank];
255        int infoIndex = 0;
256        for (int i = 0; i < actualCountInfo; ++i)
257        {
258          ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[indexTmp[i]], beginBuff, infoIndex);
259        }
260        nbSendBuffInfoReceived += actualCountInfo;
261        repondAlreadyReceived.push_back(clientSourceRank);
262      }
263    }
264
265    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
266      requestRecvInfo.erase(repondAlreadyReceived[i]);
267    repondAlreadyReceived.resize(0);
268  }
269
270  indexToInfoMappingLevel_.swap(indexToInfoMapping);
271  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
272  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
273  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second;
274  for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx];
275}
276
277/*!
278  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
279*/
280template<typename T, typename H>
281void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
282{
283  // Compute range of hash index for each client
284  hashedIndex.resize(nbClient+1);
285  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
286  size_t nbHashIndex;
287  hashedIndex[0] = 0;
288  for (int i = 1; i < nbClient; ++i)
289  {
290    nbHashIndex = nbHashIndexMax / nbClient;
291    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
292    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
293  }
294  hashedIndex[nbClient] = nbHashIndexMax;
295}
296
297/*!
298  Compute distribution of global index for servers
299  Each client already holds a piece of information and its attached index.
300This information will be redistributed among processes by projecting indices into size_t space.
301After the redistribution, each client holds rearranged index and its corresponding information.
302  \param [in] indexInfoMap index and its corresponding info (usually server index)
303  \param [in] commLevel communicator of current level
304  \param [in] level current level
305*/
306template<typename T, typename H>
307void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
308                                                            const MPI_Comm& commLevel,
309                                                            int level)
310{
311  int clientRank;
312  MPI_Comm_rank(commLevel,&clientRank);
313  computeSendRecvRank(level, clientRank);
314
315  int groupRankBegin = this->getGroupBegin()[level];
316  int nbClient = this->getNbInGroup()[level];
317  std::vector<size_t> hashedIndex;
318  computeHashIndex(hashedIndex, nbClient);
319
320  std::vector<int> sendBuff(nbClient,0);
321  std::vector<int> sendNbIndexBuff(nbClient,0);
322  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
323                                      iteClientHash = hashedIndex.end();
324  typename boost::unordered_map<size_t,InfoType>::const_iterator itb = indexInfoMap.begin(),it,
325                                                                 ite = indexInfoMap.end();
326  HashXIOS<size_t> hashGlobalIndex;
327
328  // Compute size of sending and receving buffer
329  for (it = itb; it != ite; ++it)
330  {
331    size_t hashIndex = hashGlobalIndex(it->first);
332    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
333    {
334      int indexClient = std::distance(itbClientHash, itClientHash)-1;
335      {
336        ++sendNbIndexBuff[indexClient];
337      }
338    }
339  }
340
341  std::map<int, size_t*> client2ClientIndex;
342  std::map<int, unsigned char*> client2ClientInfo;
343  for (int idx = 0; idx < nbClient; ++idx)
344  {
345    if (0 != sendNbIndexBuff[idx])
346    {
347      client2ClientIndex[idx+groupRankBegin] = new unsigned long [sendNbIndexBuff[idx]];
348      client2ClientInfo[idx+groupRankBegin] = new unsigned char [sendNbIndexBuff[idx]*ProcessDHTElement<InfoType>::typeSize()];
349      sendNbIndexBuff[idx] = 0;
350      sendBuff[idx] = 1;
351    }
352  }
353
354  std::vector<int> sendNbInfo(nbClient,0);
355  for (it = itb; it != ite; ++it)
356  {
357    size_t hashIndex = hashGlobalIndex(it->first);
358    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
359    {
360      int indexClient = std::distance(itbClientHash, itClientHash)-1;
361      {
362        client2ClientIndex[indexClient + groupRankBegin][sendNbIndexBuff[indexClient]] = it->first;;
363        ProcessDHTElement<InfoType>::packElement(it->second, client2ClientInfo[indexClient + groupRankBegin], sendNbInfo[indexClient]);
364        ++sendNbIndexBuff[indexClient];
365      }
366    }
367  }
368
369  // Calculate from how many clients each client receive message.
370  // Calculate size of buffer for receiving message
371  int recvNbClient, recvNbIndexCount;
372  sendRecvRank(level, sendBuff, sendNbIndexBuff,
373               recvNbClient, recvNbIndexCount);
374
375  // If a client holds information about index and the corresponding which don't belong to it,
376  // it will send a message to the correct clients.
377  // Contents of the message are index and its corresponding informatioin
378  std::list<MPI_Request> sendRequest;
379  std::map<int, size_t* >::iterator itbIndex = client2ClientIndex.begin(), itIndex,
380                                    iteIndex = client2ClientIndex.end();
381  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex)
382    sendIndexToClients(itIndex->first, itIndex->second, sendNbIndexBuff[itIndex->first-groupRankBegin], commLevel, sendRequest);
383  std::map<int, unsigned char*>::iterator itbInfo = client2ClientInfo.begin(), itInfo,
384                                          iteInfo = client2ClientInfo.end();
385  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo)
386    sendInfoToClients(itInfo->first, itInfo->second, sendNbInfo[itInfo->first-groupRankBegin], commLevel, sendRequest);
387
388
389  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
390  unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
391
392  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo;
393  std::map<int, int> countBuffInfo, countBuffIndex;
394  std::vector<int> processedList;
395
396  bool isFinished = (0 == recvNbClient) ? true : false;
397
398  // Counting of buffer for receiving global index
399  int countIndex = 0;
400
401  // Counting of buffer for receiving server index
402  int countInfo = 0;
403
404  // Request returned by MPI_IRecv function about global index
405  std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo;
406
407  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
408  std::map<int, unsigned long*> indexBuffBegin;
409
410  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
411  std::map<int, unsigned char*> infoBuffBegin;
412
413  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
414
415  // Now each client trys to listen to demand from others.
416  // If they have message, it processes: pushing global index and corresponding server to its map
417  while (!isFinished || (!sendRequest.empty()))
418  {
419    testSendRequest(sendRequest);
420    probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
421                                 countIndex, indexBuffBegin,
422                                 requestRecvIndex, commLevel);
423    // Processing complete request
424    for (itRequestIndex = requestRecvIndex.begin();
425         itRequestIndex != requestRecvIndex.end();
426         ++itRequestIndex)
427    {
428      int rank = itRequestIndex->first;
429      int count = computeBuffCountIndex(itRequestIndex->second);
430      if (0 != count)
431        countBuffIndex[rank] = count;
432    }
433
434    probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
435                                countInfo, infoBuffBegin,
436                                requestRecvInfo, commLevel);
437    for (itRequestInfo = requestRecvInfo.begin();
438         itRequestInfo != requestRecvInfo.end();
439         ++itRequestInfo)
440    {
441      int rank = itRequestInfo->first;
442      int count = computeBuffCountInfo(itRequestInfo->second);
443      if (0 != count)
444        countBuffInfo[rank] = count;
445    }
446
447    for (std::map<int, int>::iterator it = countBuffIndex.begin();
448                                      it != countBuffIndex.end(); ++it)
449    {
450      int rank = it->first;
451      if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
452          (countBuffIndex.end() != countBuffIndex.find(rank)))
453      {
454        int count = it->second;
455        InfoType infoValue;
456        int infoIndex = 0;
457        for (int i = 0; i < count; ++i)
458        {
459          ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuffBegin[rank], infoIndex);
460          indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),infoValue));
461        }
462
463        processedList.push_back(rank);
464        --recvNbClient;
465      }
466    }
467
468    for (int i = 0; i < processedList.size(); ++i)
469    {
470      requestRecvInfo.erase(processedList[i]);
471      requestRecvIndex.erase(processedList[i]);
472      countBuffIndex.erase(processedList[i]);
473      countBuffInfo.erase(processedList[i]);
474    }
475
476    if (0 == recvNbClient) isFinished = true;
477  }
478
479  for (itIndex = itbIndex; itIndex != iteIndex; ++itIndex) delete [] itIndex->second;
480  for (itInfo = itbInfo; itInfo != iteInfo; ++itInfo) delete [] itInfo->second;
481  delete [] recvIndexBuff;
482  delete [] recvInfoBuff;
483
484  // Ok, now do something recursive
485  if (0 < level)
486  {
487    --level;
488    computeDistributedIndex(indexToInfoMapping, this->internalComm_, level);
489  }
490  else
491    index2InfoMapping_ = indexToInfoMapping;
492}
493
494/*!
495  Probe and receive message containg global index from other clients.
496  Each client can send a message of global index to other clients to fulfill their maps.
497Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
498  \param [in] recvIndexBuff buffer dedicated for receiving global index
499  \param [in] recvNbIndexCount size of the buffer
500  \param [in] countIndex number of received index
501  \param [in] indexBuffBegin beginning of index buffer for each source rank
502  \param [in] requestRecvIndex request of receving index
503  \param [in] intraComm communicator
504*/
505template<typename T, typename H>
506void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
507                                                                 const int recvNbIndexCount,
508                                                                 int& countIndex,
509                                                                 std::map<int, unsigned long*>& indexBuffBegin,
510                                                                 std::map<int, MPI_Request>& requestRecvIndex,
511                                                                 const MPI_Comm& intraComm)
512{
513  MPI_Status statusIndexGlobal;
514  int flagIndexGlobal, count;
515
516  // Probing for global index
517  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
518  if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
519  {
520    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
521    indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
522    MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
523              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
524              &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
525    countIndex += count;
526  }
527}
528
529/*!
530  Probe and receive message containg server index from other clients.
531  Each client can send a message of server index to other clients to fulfill their maps.
532Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
533  \param [in] recvInfoBuff buffer dedicated for receiving server index
534  \param [in] recvNbIndexCount size of the buffer
535  \param [in] countInfo number of received info
536  \param [in] infoBuffBegin beginning of index buffer for each source rank
537  \param [in] requestRecvInfo request of receving index
538  \param [in] intraComm communicator
539*/
540template<typename T, typename H>
541void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(unsigned char* recvInfoBuff,
542                                                                const int recvNbIndexCount,
543                                                                int& countInfo,
544                                                                std::map<int, unsigned char*>& infoBuffBegin,
545                                                                std::map<int, MPI_Request>& requestRecvInfo,
546                                                                const MPI_Comm& intraComm)
547{
548  MPI_Status statusInfo;
549  int flagInfo, count;
550
551  // Probing for server index
552  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
553  if ((true == flagInfo) && (countInfo < recvNbIndexCount))
554  {
555    MPI_Get_count(&statusInfo, MPI_CHAR, &count);
556    unsigned char* beginInfoBuff = recvInfoBuff+countInfo*infoTypeSize;
557    infoBuffBegin.insert(std::make_pair<int, unsigned char*>(statusInfo.MPI_SOURCE, beginInfoBuff));
558    MPI_Irecv(beginInfoBuff, count, MPI_CHAR,
559              statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
560              &requestRecvInfo[statusInfo.MPI_SOURCE]);
561
562    countInfo += count/infoTypeSize;
563  }
564}
565
566/*!
567  Send message containing index to clients
568  \param [in] clientDestRank rank of destination client
569  \param [in] indices index to send
570  \param [in] clientIntraComm communication group of client
571  \param [in] requestSendIndex list of sending request
572*/
573template<typename T, typename H>
574void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, size_t* indices, size_t indiceSize,
575                                                       const MPI_Comm& clientIntraComm,
576                                                       std::list<MPI_Request>& requestSendIndex)
577{
578  MPI_Request request;
579  requestSendIndex.push_back(request);
580  MPI_Isend(indices, indiceSize, MPI_UNSIGNED_LONG,
581            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
582}
583
584/*!
585  Send message containing information to clients
586  \param [in] clientDestRank rank of destination client
587  \param [in] info server index to send
588  \param [in] clientIntraComm communication group of client
589  \param [in] requestSendInfo list of sending request
590*/
591template<typename T, typename H>
592void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
593                       const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendInfo)
594{
595  MPI_Request request;
596  requestSendInfo.push_back(request);
597
598  MPI_Isend(info, infoSize, MPI_CHAR,
599            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
600}
601
602/*!
603  Verify status of sending request
604  \param [in] sendRequest sending request to verify
605*/
606template<typename T, typename H>
607void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest)
608{
609  int flag = 0;
610  MPI_Status status;
611  std::list<MPI_Request>::iterator itRequest;
612  int sizeListRequest = sendRequest.size();
613  int idx = 0;
614  while (idx < sizeListRequest)
615  {
616    bool isErased = false;
617    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
618    {
619      MPI_Test(&(*itRequest), &flag, &status);
620      if (true == flag)
621      {
622        isErased = true;
623        break;
624      }
625    }
626    if (true == isErased) sendRequest.erase(itRequest);
627    ++idx;
628  }
629}
630
631/*!
632  Compute size of message containing global index
633  \param[in] requestRecv request of message
634*/
635template<typename T, typename H>
636int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv)
637{
638  int flag, count = 0;
639  MPI_Status status;
640
641  MPI_Test(&requestRecv, &flag, &status);
642  if (true == flag)
643  {
644    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
645  }
646
647  return count;
648}
649
650/*!
651  Compute size of message containing server index
652  \param[in] requestRecv request of message
653*/
654template<typename T, typename H>
655int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv)
656{
657  int flag, count = 0;
658  MPI_Status status;
659
660  MPI_Test(&requestRecv, &flag, &status);
661  if (true == flag)
662  {
663    MPI_Get_count(&status, MPI_CHAR, &count);
664  }
665
666  return (count/infoTypeSize);
667}
668
669/*!
670  Compute how many processes one process needs to send to and from how many processes it will receive
671*/
672template<typename T, typename H>
673void CClientClientDHTTemplate<T,H>::computeSendRecvRank(int level, int rank)
674{
675  int groupBegin = this->getGroupBegin()[level];
676  int nbInGroup  = this->getNbInGroup()[level];
677  const std::vector<int>& groupParentBegin = this->getGroupParentsBegin()[level];
678  const std::vector<int>& nbInGroupParents = this->getNbInGroupParents()[level];
679
680  std::vector<size_t> hashedIndexGroup;
681  computeHashIndex(hashedIndexGroup, nbInGroup);
682  size_t a = hashedIndexGroup[rank-groupBegin];
683  size_t b = hashedIndexGroup[rank-groupBegin+1]-1;
684
685  int currentGroup, offset;
686  size_t e,f;
687
688  // Do a simple math [a,b) intersect [c,d)
689  for (int idx = 0; idx < groupParentBegin.size(); ++idx)
690  {
691    std::vector<size_t> hashedIndexGroupParent;
692    int nbInGroupParent = nbInGroupParents[idx];
693    if (0 != nbInGroupParent)
694      computeHashIndex(hashedIndexGroupParent, nbInGroupParent);
695    for (int i = 0; i < nbInGroupParent; ++i)
696    {
697      size_t c = hashedIndexGroupParent[i];
698      size_t d = hashedIndexGroupParent[i+1]-1;
699
700    if (!((d < a) || (b <c)))
701        recvRank_[level].push_back(groupParentBegin[idx]+i);
702    }
703
704    offset = rank - groupParentBegin[idx];
705    if ((offset<nbInGroupParents[idx]) && (0 <= offset))
706    {
707      e = hashedIndexGroupParent[offset];
708      f = hashedIndexGroupParent[offset+1]-1;
709    }
710  }
711
712  std::vector<size_t>::const_iterator itbHashGroup = hashedIndexGroup.begin(), itHashGroup,
713                                      iteHashGroup = hashedIndexGroup.end();
714  itHashGroup = std::lower_bound(itbHashGroup, iteHashGroup, e+1);
715  int begin = std::distance(itbHashGroup, itHashGroup)-1;
716  itHashGroup = std::upper_bound(itbHashGroup, iteHashGroup, f);
717  int end = std::distance(itbHashGroup, itHashGroup) -1;
718  sendRank_[level].resize(end-begin+1);
719  for (int idx = 0; idx < sendRank_[level].size(); ++idx) sendRank_[level][idx] = idx + groupBegin + begin;
720}
721
722/*!
723  Send and receive number of process each process need to listen to as well as number
724  of index it will receive
725*/
726template<typename T, typename H>
727void CClientClientDHTTemplate<T,H>::sendRecvRank(int level,
728                                                 const std::vector<int>& sendNbRank, const std::vector<int>& sendNbElements,
729                                                 int& recvNbRank, int& recvNbElements)
730{
731  int groupBegin = this->getGroupBegin()[level];
732
733  int offSet = 0;
734  std::vector<int>& sendRank = sendRank_[level];
735  std::vector<int>& recvRank = recvRank_[level];
736  int sendBuffSize = sendRank.size();
737  int* sendBuff = new int [sendBuffSize*2];
738  std::vector<MPI_Request> request(sendBuffSize);
739  std::vector<MPI_Status> requestStatus(sendBuffSize);
740  int recvBuffSize = recvRank.size();
741  int* recvBuff = new int [2];
742
743  for (int idx = 0; idx < sendBuffSize; ++idx)
744  {
745    offSet = sendRank[idx]-groupBegin;
746    sendBuff[idx*2] = sendNbRank[offSet];
747    sendBuff[idx*2+1] = sendNbElements[offSet];
748  }
749
750  for (int idx = 0; idx < sendBuffSize; ++idx)
751  {
752    MPI_Isend(&sendBuff[idx*2], 2, MPI_INT,
753              sendRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &request[idx]);
754  }
755
756  MPI_Status status;
757  int nbRecvRank = 0, nbRecvElements = 0;
758  for (int idx = 0; idx < recvBuffSize; ++idx)
759  {
760    MPI_Recv(recvBuff, 2, MPI_INT,
761             recvRank[idx], MPI_DHT_INDEX_1, this->internalComm_, &status);
762    nbRecvRank += *(recvBuff);
763    nbRecvElements += *(recvBuff+1);
764  }
765
766  MPI_Waitall(sendBuffSize, &request[0], &requestStatus[0]);
767
768  recvNbRank = nbRecvRank;
769  recvNbElements = nbRecvElements;
770
771  delete [] sendBuff;
772  delete [] recvBuff;
773}
774
775}
Note: See TracBrowser for help on using the repository browser.