source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 830

Last change on this file since 830 was 830, checked in by mhnguyen, 8 years ago

Implementing pack and unpack function for generic type of dht

+) Replace union with the more generic function for packing data into buffer and unpacking data from buffer
+) Change PairIntInt? from struct to std::pair<int,int>

Test
+) On Curie
+) All tests pass

File size: 25.5 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 05 Oct 2015
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                        const MPI_Comm& clientIntraComm,
26                                                        int hierarLvl)
27  : index2InfoMapping_(), indexToInfoMappingLevel_()
28{
29  this->computeMPICommLevel(clientIntraComm, hierarLvl);
30  int lvl = this->commLevel_.size() - 1;
31  computeDistributedIndex(indexInfoMap, this->commLevel_[lvl], lvl);
32}
33
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
36{
37}
38
39/*!
40  Compute mapping between indices and information corresponding to these indices
41  \param [in] indices indices a proc has
42*/
43template<typename T, typename H>
44void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
45{
46  int lvl = this->commLevel_.size() - 1;
47  computeIndexInfoMappingLevel(indices, this->commLevel_[lvl], lvl);
48}
49
50/*!
51    Compute mapping between indices and information corresponding to these indices
52for each level of hierarchical DHT. Recursive function
53   \param [in] indices indices a proc has
54   \param [in] commLevel communicator of current level
55   \param [in] level current level
56*/
57template<typename T, typename H>
58void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
59                                                                 const MPI_Comm& commLevel,
60                                                                 int level)
61{
62  int nbClient, clientRank;
63  MPI_Comm_size(commLevel,&nbClient);
64  MPI_Comm_rank(commLevel,&clientRank);
65  std::vector<size_t> hashedIndex;
66  computeHashIndex(hashedIndex, nbClient);
67
68  size_t ssize = indices.numElements(), hashedVal;
69
70  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
71                                      iteClientHash = hashedIndex.end();
72  std::map<int, std::vector<size_t> > client2ClientIndex;
73
74  // Number of global index whose mapping server are on other clients
75  int nbIndexToSend = 0;
76  HashXIOS<size_t> hashGlobalIndex;
77  for (int i = 0; i < ssize; ++i)
78  {
79    size_t index = indices(i);
80    hashedVal  = hashGlobalIndex(index);
81    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
82    if (iteClientHash != itClientHash)
83    {
84      int indexClient = std::distance(itbClientHash, itClientHash)-1;
85      {
86        client2ClientIndex[indexClient].push_back(index);
87        ++nbIndexToSend;
88      }
89    }
90  }
91
92  int* sendBuff = new int[nbClient];
93  for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0;
94  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndex.begin(), it,
95                                                ite  = client2ClientIndex.end();
96  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
97  int* recvBuff = new int[nbClient];
98  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
99
100  std::list<MPI_Request> sendIndexRequest;
101  if (0 != nbIndexToSend)
102      for (it = itb; it != ite; ++it)
103         sendIndexToClients(it->first, it->second, commLevel, sendIndexRequest);
104
105  int nbDemandingClient = recvBuff[clientRank], nbSendBuffInfoReceived = 0;
106
107  // Receiving demand as well as the responds from other clients
108  // The demand message contains global index; meanwhile the responds have server index information
109  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
110  // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
111  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
112  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
113
114  unsigned long* recvBuffIndex = 0;
115  int maxNbIndexDemandedFromOthers = recvBuff[clientRank];
116
117  if (0 != maxNbIndexDemandedFromOthers)
118    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
119
120  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
121//  InfoType* recvBuffInfo = 0;
122  unsigned char* recvBuffInfo = 0;
123  int nbIndexReceivedFromOthers = nbIndexToSend;
124  if (0 != nbIndexReceivedFromOthers)
125    recvBuffInfo = new unsigned char[nbIndexReceivedFromOthers*ProcessDHTElement<InfoType>::typeSize()];
126
127  std::map<int, MPI_Request>::iterator itRequest;
128  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
129
130  int countIndex = 0;  // Counting of buffer for receiving index
131  std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index
132
133  // Mapping client rank and the beginning position of receiving buffer for message of index from this client
134  std::map<int, unsigned long*> indexBuffBegin;
135
136  std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
137
138  CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
139
140  int k = 0;
141  while ((0 < nbDemandingClient) || (!sendIndexRequest.empty()))
142  {
143    // Just check whether a client has any demand from other clients.
144    // If it has, then it should send responds to these client(s)
145    probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
146                                 countIndex, indexBuffBegin,
147                                 requestRecvIndex, commLevel);
148    if (0 < nbDemandingClient)
149    {
150      for (itRequest = requestRecvIndex.begin();
151           itRequest != requestRecvIndex.end(); ++itRequest)
152      {
153        int flagIndexGlobal, count;
154        MPI_Status statusIndexGlobal;
155
156        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
157        if (true == flagIndexGlobal)
158        {
159          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
160          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
161          unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
162          for (int i = 0; i < count; ++i)
163          {
164            src2Index[clientSourceRank].push_back(*(beginBuff+i));
165            tmpGlobalIndexOnClient(k) = *(beginBuff+i);
166            ++k;
167          }
168          --nbDemandingClient;
169
170          demandAlreadyReceived.push_back(clientSourceRank);
171        }
172      }
173      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
174        requestRecvIndex.erase(demandAlreadyReceived[i]);
175    }
176
177    testSendRequest(sendIndexRequest);
178  }
179
180  if (0 < level)
181  {
182    --level;
183    computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->commLevel_[level], level);
184  }
185  else
186    indexToInfoMappingLevel_ = index2InfoMapping_;
187
188  std::map<int, std::vector<InfoType> > client2ClientInfo;
189  std::vector<unsigned char*> infoToSend(src2Index.size());
190  std::list<MPI_Request> sendInfoRequest;
191  std::map<int, std::vector<size_t> >::iterator itSrc2Idx = src2Index.begin(),
192                                                iteSrc2Idx = src2Index.end();
193  for (int i=0; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx, ++i)
194  {
195    int clientSourceRank = itSrc2Idx->first;
196    std::vector<size_t>& srcIdx = itSrc2Idx->second;
197    infoToSend[i] = new unsigned char [srcIdx.size()*ProcessDHTElement<InfoType>::typeSize()];
198    int infoIndex = 0;
199    for (int idx = 0; idx < srcIdx.size(); ++idx)
200    {
201      ProcessDHTElement<InfoType>::packElement(indexToInfoMappingLevel_[srcIdx[idx]], infoToSend[i], infoIndex);
202    }
203    sendInfoToClients(clientSourceRank, infoToSend[i], infoIndex, commLevel, sendInfoRequest);
204  }
205
206  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
207  int countInfo = 0; // Counting of buffer for receiving server index
208  std::map<int, MPI_Request> requestRecvInfo;
209
210  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
211  std::map<int, unsigned char*> infoBuffBegin;
212
213  while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers))
214  {
215    testSendRequest(sendInfoRequest);
216
217    // In some cases, a client need to listen respond from other clients about server information
218    // Ok, with the information, a client can fill in its server-global index map.
219    probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
220                                countInfo, infoBuffBegin,
221                                requestRecvInfo, commLevel);
222    for (itRequest = requestRecvInfo.begin();
223         itRequest != requestRecvInfo.end();
224         ++itRequest)
225    {
226      int flagInfo, count;
227      MPI_Status statusInfo;
228
229      MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
230      if (true == flagInfo)
231      {
232        MPI_Get_count(&statusInfo, MPI_CHAR, &count);
233        int actualCountInfo = count/infoTypeSize;
234        int clientSourceRank = statusInfo.MPI_SOURCE;
235        unsigned char* beginBuff = infoBuffBegin[clientSourceRank];
236        std::vector<size_t>& indexTmp = client2ClientIndex[clientSourceRank];
237        int infoIndex = 0;
238        for (int i = 0; i < actualCountInfo; ++i)
239        {
240          ProcessDHTElement<InfoType>::unpackElement(indexToInfoMapping[indexTmp[i]], beginBuff, infoIndex);
241        }
242        nbSendBuffInfoReceived += actualCountInfo;
243        repondAlreadyReceived.push_back(clientSourceRank);
244      }
245    }
246
247    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
248      requestRecvInfo.erase(repondAlreadyReceived[i]);
249    repondAlreadyReceived.resize(0);
250  }
251
252  indexToInfoMappingLevel_.swap(indexToInfoMapping);
253  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
254  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
255  for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx];
256  delete [] sendBuff;
257  delete [] recvBuff;
258}
259
260/*!
261  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
262*/
263template<typename T, typename H>
264void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
265{
266  // Compute range of hash index for each client
267  hashedIndex.resize(nbClient+1);
268  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
269  size_t nbHashIndex;
270  hashedIndex[0] = 0;
271  for (int i = 1; i < nbClient; ++i)
272  {
273    nbHashIndex = nbHashIndexMax / nbClient;
274    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
275    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
276  }
277  hashedIndex[nbClient] = nbHashIndexMax;
278}
279
280/*!
281  Compute distribution of global index for servers
282  Each client already holds a piece of information and its attached index.
283This information will be redistributed among processes by projecting indices into size_t space.
284After the redistribution, each client holds rearranged index and its corresponding information.
285  \param [in] indexInfoMap index and its corresponding info (usually server index)
286  \param [in] commLevel communicator of current level
287  \param [in] level current level
288*/
289template<typename T, typename H>
290void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
291                                                            const MPI_Comm& commLevel,
292                                                            int level)
293{
294  int nbClient, clientRank;
295  MPI_Comm_size(commLevel,&nbClient);
296  MPI_Comm_rank(commLevel,&clientRank);
297  std::vector<size_t> hashedIndex;
298  computeHashIndex(hashedIndex, nbClient);
299
300  int* sendBuff = new int[nbClient];
301  int* sendNbIndexBuff = new int[nbClient];
302  for (int i = 0; i < nbClient; ++i)
303  {
304    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
305  }
306
307  // Compute size of sending and receving buffer
308  std::map<int, std::vector<size_t> > client2ClientIndex;
309  std::map<int, std::vector<InfoType> > client2ClientInfo;
310
311  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
312                                      iteClientHash = hashedIndex.end();
313  typename boost::unordered_map<size_t,InfoType>::const_iterator it  = indexInfoMap.begin(),
314                                                                 ite = indexInfoMap.end();
315  HashXIOS<size_t> hashGlobalIndex;
316  for (; it != ite; ++it)
317  {
318    size_t hashIndex = hashGlobalIndex(it->first);
319    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
320    if (itClientHash != iteClientHash)
321    {
322      int indexClient = std::distance(itbClientHash, itClientHash)-1;
323      {
324        sendBuff[indexClient] = 1;
325        ++sendNbIndexBuff[indexClient];
326        client2ClientIndex[indexClient].push_back(it->first);
327        client2ClientInfo[indexClient].push_back(it->second);
328      }
329    }
330  }
331
332  // Calculate from how many clients each client receive message.
333  int* recvBuff = new int[nbClient];
334  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
335  int recvNbClient = recvBuff[clientRank];
336
337  // Calculate size of buffer for receiving message
338  int* recvNbIndexBuff = new int[nbClient];
339  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
340  int recvNbIndexCount = recvNbIndexBuff[clientRank];
341  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
342  unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*ProcessDHTElement<InfoType>::typeSize()];
343
344  // If a client holds information about index and the corresponding which don't belong to it,
345  // it will send a message to the correct clients.
346  // Contents of the message are index and its corresponding informatioin
347  std::list<MPI_Request> sendRequest;
348  std::map<int, std::vector<size_t> >::iterator itIndex  = client2ClientIndex.begin(),
349                                                iteIndex = client2ClientIndex.end();
350  for (; itIndex != iteIndex; ++itIndex)
351    sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest);
352  typename std::map<int, std::vector<InfoType> >::iterator itbInfo = client2ClientInfo.begin(), itInfo,
353                                                           iteInfo = client2ClientInfo.end();
354
355  std::vector<int> infoSizeToSend(client2ClientInfo.size(),0);
356  std::vector<unsigned char*> infoToSend(client2ClientInfo.size());
357  itInfo = itbInfo;
358  for (int idx = 0; itInfo != iteInfo; ++itInfo, ++idx)
359  {
360    const std::vector<InfoType>& infoVec = itInfo->second;
361    int infoVecSize = infoVec.size();
362    std::vector<int> infoIndex(infoVecSize);
363    for (int i = 0; i < infoVecSize; ++i)
364    {
365      infoIndex[i] = infoSizeToSend[idx];
366      ProcessDHTElement<InfoType>::packElement(infoVec[i], NULL, infoSizeToSend[idx]);
367    }
368
369    infoToSend[idx] = new unsigned char[infoSizeToSend[idx]];
370    infoSizeToSend[idx] = 0;
371    for (int i = 0; i < infoVecSize; ++i)
372    {
373      ProcessDHTElement<InfoType>::packElement(infoVec[i], infoToSend[idx], infoSizeToSend[idx]);
374    }
375
376    sendInfoToClients(itInfo->first, infoToSend[idx], infoSizeToSend[idx], commLevel, sendRequest);
377  }
378
379
380  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo;
381  std::map<int, int> countBuffInfo, countBuffIndex;
382  std::vector<int> processedList;
383
384  bool isFinished = (0 == recvNbClient) ? true : false;
385
386  // Counting of buffer for receiving global index
387  int countIndex = 0;
388
389  // Counting of buffer for receiving server index
390  int countInfo = 0;
391
392  // Request returned by MPI_IRecv function about global index
393  std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo;
394
395  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
396  std::map<int, unsigned long*> indexBuffBegin;
397
398  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
399  std::map<int, unsigned char*> infoBuffBegin;
400
401  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
402
403  // Now each client trys to listen to demand from others.
404  // If they have message, it processes: pushing global index and corresponding server to its map
405  while (!isFinished || (!sendRequest.empty()))
406  {
407    testSendRequest(sendRequest);
408    probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
409                                 countIndex, indexBuffBegin,
410                                 requestRecvIndex, commLevel);
411    // Processing complete request
412    for (itRequestIndex = requestRecvIndex.begin();
413         itRequestIndex != requestRecvIndex.end();
414         ++itRequestIndex)
415    {
416      int rank = itRequestIndex->first;
417      int count = computeBuffCountIndex(itRequestIndex->second);
418      if (0 != count)
419        countBuffIndex[rank] = count;
420    }
421
422    probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
423                                countInfo, infoBuffBegin,
424                                requestRecvInfo, commLevel);
425    for (itRequestInfo = requestRecvInfo.begin();
426         itRequestInfo != requestRecvInfo.end();
427         ++itRequestInfo)
428    {
429      int rank = itRequestInfo->first;
430      int count = computeBuffCountInfo(itRequestInfo->second);
431      if (0 != count)
432        countBuffInfo[rank] = count;
433    }
434
435    for (std::map<int, int>::iterator it = countBuffIndex.begin();
436                                      it != countBuffIndex.end(); ++it)
437    {
438      int rank = it->first;
439      if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
440          (countBuffIndex.end() != countBuffIndex.find(rank)))
441      {
442        int count = it->second;
443        InfoType infoValue;
444        int infoIndex = 0;
445        for (int i = 0; i < count; ++i)
446        {
447          ProcessDHTElement<InfoType>::unpackElement(infoValue, infoBuffBegin[rank], infoIndex);
448          indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),infoValue));
449        }
450
451        processedList.push_back(rank);
452        --recvNbClient;
453      }
454    }
455
456    for (int i = 0; i < processedList.size(); ++i)
457    {
458      requestRecvInfo.erase(processedList[i]);
459      requestRecvIndex.erase(processedList[i]);
460      countBuffIndex.erase(processedList[i]);
461      countBuffInfo.erase(processedList[i]);
462    }
463
464    if (0 == recvNbClient) isFinished = true;
465  }
466
467  for (int idx = 0; idx < infoToSend.size(); ++idx) delete [] infoToSend[idx];
468  delete [] sendBuff;
469  delete [] sendNbIndexBuff;
470  delete [] recvBuff;
471  delete [] recvNbIndexBuff;
472  delete [] recvIndexBuff;
473  delete [] recvInfoBuff;
474
475  // Ok, now do something recursive
476  if (0 < level)
477  {
478    --level;
479    computeDistributedIndex(indexToInfoMapping, this->commLevel_[level], level);
480  }
481  else
482    index2InfoMapping_ = indexToInfoMapping;
483}
484
485/*!
486  Probe and receive message containg global index from other clients.
487  Each client can send a message of global index to other clients to fulfill their maps.
488Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
489  \param [in] recvIndexBuff buffer dedicated for receiving global index
490  \param [in] recvNbIndexCount size of the buffer
491  \param [in] countIndex number of received index
492  \param [in] indexBuffBegin beginning of index buffer for each source rank
493  \param [in] requestRecvIndex request of receving index
494  \param [in] intraComm communicator
495*/
496template<typename T, typename H>
497void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
498                                                                 const int recvNbIndexCount,
499                                                                 int& countIndex,
500                                                                 std::map<int, unsigned long*>& indexBuffBegin,
501                                                                 std::map<int, MPI_Request>& requestRecvIndex,
502                                                                 const MPI_Comm& intraComm)
503{
504  MPI_Status statusIndexGlobal;
505  int flagIndexGlobal, count;
506
507  // Probing for global index
508  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
509  if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
510  {
511    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
512    indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
513    MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
514              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
515              &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
516    countIndex += count;
517  }
518}
519
520/*!
521  Probe and receive message containg server index from other clients.
522  Each client can send a message of server index to other clients to fulfill their maps.
523Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
524  \param [in] recvInfoBuff buffer dedicated for receiving server index
525  \param [in] recvNbIndexCount size of the buffer
526  \param [in] countInfo number of received info
527  \param [in] infoBuffBegin beginning of index buffer for each source rank
528  \param [in] requestRecvInfo request of receving index
529  \param [in] intraComm communicator
530*/
531template<typename T, typename H>
532void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(unsigned char* recvInfoBuff,
533                                                                const int recvNbIndexCount,
534                                                                int& countInfo,
535                                                                std::map<int, unsigned char*>& infoBuffBegin,
536                                                                std::map<int, MPI_Request>& requestRecvInfo,
537                                                                const MPI_Comm& intraComm)
538{
539  MPI_Status statusInfo;
540  int flagInfo, count;
541
542  // Probing for server index
543  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
544  if ((true == flagInfo) && (countInfo < recvNbIndexCount))
545  {
546    MPI_Get_count(&statusInfo, MPI_CHAR, &count);
547    unsigned char* beginInfoBuff = recvInfoBuff+countInfo*infoTypeSize;
548    infoBuffBegin.insert(std::make_pair<int, unsigned char*>(statusInfo.MPI_SOURCE, beginInfoBuff));
549    MPI_Irecv(beginInfoBuff, count, MPI_CHAR,
550              statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
551              &requestRecvInfo[statusInfo.MPI_SOURCE]);
552
553    countInfo += count/infoTypeSize;
554  }
555}
556
557/*!
558  Send message containing index to clients
559  \param [in] clientDestRank rank of destination client
560  \param [in] indices index to send
561  \param [in] clientIntraComm communication group of client
562  \param [in] requestSendIndex list of sending request
563*/
564template<typename T, typename H>
565void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, std::vector<size_t>& indices,
566                                                       const MPI_Comm& clientIntraComm,
567                                                       std::list<MPI_Request>& requestSendIndex)
568{
569  MPI_Request request;
570  requestSendIndex.push_back(request);
571  MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG,
572            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
573}
574
575/*!
576  Send message containing information to clients
577  \param [in] clientDestRank rank of destination client
578  \param [in] info server index to send
579  \param [in] clientIntraComm communication group of client
580  \param [in] requestSendInfo list of sending request
581*/
582template<typename T, typename H>
583void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, unsigned char* info, int infoSize,
584                       const MPI_Comm& clientIntraComm, std::list<MPI_Request>& requestSendInfo)
585{
586  MPI_Request request;
587  requestSendInfo.push_back(request);
588
589  MPI_Isend(info, infoSize, MPI_CHAR,
590            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
591}
592
593/*!
594  Verify status of sending request
595  \param [in] sendRequest sending request to verify
596*/
597template<typename T, typename H>
598void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest)
599{
600  int flag = 0;
601  MPI_Status status;
602  std::list<MPI_Request>::iterator itRequest;
603  int sizeListRequest = sendRequest.size();
604  int idx = 0;
605  while (idx < sizeListRequest)
606  {
607    bool isErased = false;
608    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
609    {
610      MPI_Test(&(*itRequest), &flag, &status);
611      if (true == flag)
612      {
613        isErased = true;
614        break;
615      }
616    }
617    if (true == isErased) sendRequest.erase(itRequest);
618    ++idx;
619  }
620}
621
622/*!
623  Compute size of message containing global index
624  \param[in] requestRecv request of message
625*/
626template<typename T, typename H>
627int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv)
628{
629  int flag, count = 0;
630  MPI_Status status;
631
632  MPI_Test(&requestRecv, &flag, &status);
633  if (true == flag)
634  {
635    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
636  }
637
638  return count;
639}
640
641/*!
642  Compute size of message containing server index
643  \param[in] requestRecv request of message
644*/
645template<typename T, typename H>
646int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv)
647{
648  int flag, count = 0;
649  MPI_Status status;
650
651  MPI_Test(&requestRecv, &flag, &status);
652  if (true == flag)
653  {
654    MPI_Get_count(&status, MPI_CHAR, &count);
655  }
656
657  return (count/infoTypeSize);
658}
659
660}
Note: See TracBrowser for help on using the repository browser.