source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 792

Last change on this file since 792 was 721, checked in by mhnguyen, 8 years ago

Templated version of distributed hashed table

+) Implement DHT in more generic way to work with different type of information
+) Some old codes of DHT are kept to be a reference (they will be deleted soon)

Test
+) On local, mode attached, 8 processes
+) test_remap passes and result is correct

File size: 24.0 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 05 Oct 2015
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                      const MPI_Comm& clientIntraComm,
26                                                      int hierarLvl)
27  : index2InfoMapping_(), indexToInfoMappingLevel_()
28{
29  this->computeMPICommLevel(clientIntraComm, hierarLvl);
30  int lvl = this->commLevel_.size() - 1;
31  computeDistributedIndex(indexInfoMap, this->commLevel_[lvl], lvl);
32}
33
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
36{
37}
38
39/*!
40  Compute mapping between indices and information corresponding to these indices
41  \param [in] indices indices a proc has
42*/
43template<typename T, typename H>
44void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
45{
46  int lvl = this->commLevel_.size() - 1;
47  computeIndexInfoMappingLevel(indices, this->commLevel_[lvl], lvl);
48  size_t size = indices.numElements();
49  for (size_t idx = 0; idx < size; ++idx)
50  {
51    int serverIdx = indexToInfoMappingLevel_[indices(idx)];
52    indexGlobalOnServer_[serverIdx].push_back(indices(idx));
53  }
54}
55
56/*!
57    Compute mapping between indices and information corresponding to these indices
58for each level of hierarchical DHT. Recursive function
59   \param [in] indices indices a proc has
60   \param [in] commLevel communicator of current level
61   \param [in] level current level
62*/
63template<typename T, typename H>
64void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
65                                                                 const MPI_Comm& commLevel,
66                                                                 int level)
67{
68  int nbClient, clientRank;
69  MPI_Comm_size(commLevel,&nbClient);
70  MPI_Comm_rank(commLevel,&clientRank);
71  std::vector<size_t> hashedIndex;
72  computeHashIndex(hashedIndex, nbClient);
73
74  size_t ssize = indices.numElements(), hashedVal;
75
76  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
77                                      iteClientHash = hashedIndex.end();
78  std::map<int, std::vector<size_t> > client2ClientIndex;
79
80  // Number of global index whose mapping server are on other clients
81  int nbIndexToSend = 0;
82  HashXIOS<size_t> hashGlobalIndex;
83  for (int i = 0; i < ssize; ++i)
84  {
85    size_t index = indices(i);
86    hashedVal  = hashGlobalIndex(index);
87    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
88    if (iteClientHash != itClientHash)
89    {
90      int indexClient = std::distance(itbClientHash, itClientHash)-1;
91      {
92        client2ClientIndex[indexClient].push_back(index);
93        ++nbIndexToSend;
94      }
95    }
96  }
97
98  int* sendBuff = new int[nbClient];
99  for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0;
100  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndex.begin(), it,
101                                                ite  = client2ClientIndex.end();
102  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
103  int* recvBuff = new int[nbClient];
104  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
105
106  std::list<MPI_Request> sendIndexRequest;
107  if (0 != nbIndexToSend)
108      for (it = itb; it != ite; ++it)
109         sendIndexToClients(it->first, it->second, commLevel, sendIndexRequest);
110
111  int nbDemandingClient = recvBuff[clientRank], nbSendBuffInfoReceived = 0;
112
113  // Receiving demand as well as the responds from other clients
114  // The demand message contains global index; meanwhile the responds have server index information
115  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
116  // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
117  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
118  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
119
120  unsigned long* recvBuffIndex = 0;
121  int maxNbIndexDemandedFromOthers = recvBuff[clientRank];
122
123  if (0 != maxNbIndexDemandedFromOthers)
124    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
125
126  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
127  InfoType* recvBuffInfo = 0;
128  int nbIndexReceivedFromOthers = nbIndexToSend;
129  if (0 != nbIndexReceivedFromOthers)
130    recvBuffInfo = new InfoType[nbIndexReceivedFromOthers];
131
132  std::map<int, MPI_Request>::iterator itRequest;
133  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
134
135
136  int countIndex = 0;  // Counting of buffer for receiving index
137  std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index
138
139  // Mapping client rank and the beginning position of receiving buffer for message of index from this client
140  std::map<int, unsigned long*> indexBuffBegin;
141
142  std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
143
144  CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
145
146  int k = 0;
147  while ((0 < nbDemandingClient) || (!sendIndexRequest.empty()))
148  {
149    // Just check whether a client has any demand from other clients.
150    // If it has, then it should send responds to these client(s)
151    probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
152                                 countIndex, indexBuffBegin,
153                                 requestRecvIndex, commLevel);
154    if (0 < nbDemandingClient)
155    {
156      for (itRequest = requestRecvIndex.begin();
157           itRequest != requestRecvIndex.end(); ++itRequest)
158      {
159        int flagIndexGlobal, count;
160        MPI_Status statusIndexGlobal;
161
162        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
163        if (true == flagIndexGlobal)
164        {
165          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
166          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
167          unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
168          for (int i = 0; i < count; ++i)
169          {
170            src2Index[clientSourceRank].push_back(*(beginBuff+i));
171            tmpGlobalIndexOnClient(k) = *(beginBuff+i);
172            ++k;
173          }
174          --nbDemandingClient;
175
176          demandAlreadyReceived.push_back(clientSourceRank);
177        }
178      }
179      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
180        requestRecvIndex.erase(demandAlreadyReceived[i]);
181    }
182
183    testSendRequest(sendIndexRequest);
184  }
185
186  if (0 < level)
187  {
188    --level;
189    computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->commLevel_[level], level);
190  }
191  else
192    indexToInfoMappingLevel_ = index2InfoMapping_;
193
194  std::map<int, std::vector<InfoType> > client2ClientInfo;
195  std::list<MPI_Request> sendInfoRequest;
196  std::map<int, std::vector<size_t> >::iterator itbSrc2Idx = src2Index.begin(), itSrc2Idx,
197                                                iteSrc2Idx = src2Index.end();
198  for (itSrc2Idx = itbSrc2Idx; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx)
199  {
200    int clientSourceRank = itSrc2Idx->first;
201    std::vector<size_t>& srcIdx = itSrc2Idx->second;
202    for (int idx = 0; idx < srcIdx.size(); ++idx)
203    {
204      client2ClientInfo[clientSourceRank].push_back(indexToInfoMappingLevel_[srcIdx[idx]]);
205    }
206    sendInfoToClients(clientSourceRank, client2ClientInfo[clientSourceRank], commLevel, sendInfoRequest);
207  }
208
209  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
210  int countInfo = 0; // Counting of buffer for receiving server index
211  std::map<int, MPI_Request> requestRecvInfo;
212
213  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
214  std::map<int, InfoType*> infoBuffBegin;
215
216  while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers))
217  {
218    testSendRequest(sendInfoRequest);
219
220    // In some cases, a client need to listen respond from other clients about server information
221    // Ok, with the information, a client can fill in its server-global index map.
222    probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
223                                countInfo, infoBuffBegin,
224                                requestRecvInfo, commLevel);
225    for (itRequest = requestRecvInfo.begin();
226         itRequest != requestRecvInfo.end();
227         ++itRequest)
228    {
229      int flagInfo, count;
230      MPI_Status statusInfo;
231
232      MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
233      if (true == flagInfo)
234      {
235        MPI_Get_count(&statusInfo, MPI_INT, &count);
236        int clientSourceRank = statusInfo.MPI_SOURCE;
237        InfoType* beginBuff = infoBuffBegin[clientSourceRank];
238        std::vector<size_t>& indexTmp = client2ClientIndex[clientSourceRank];
239        for (int i = 0; i < count; ++i)
240        {
241          indexToInfoMapping[indexTmp[i]] = *(beginBuff+i);
242        }
243        nbSendBuffInfoReceived += count;
244        repondAlreadyReceived.push_back(clientSourceRank);
245      }
246    }
247
248    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
249      requestRecvInfo.erase(repondAlreadyReceived[i]);
250    repondAlreadyReceived.resize(0);
251  }
252
253  indexToInfoMappingLevel_ = indexToInfoMapping;
254  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
255  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
256  delete [] sendBuff;
257  delete [] recvBuff;
258}
259
260/*!
261  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
262*/
263template<typename T, typename H>
264void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
265{
266  // Compute range of hash index for each client
267  hashedIndex.resize(nbClient+1);
268  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
269  size_t nbHashIndex;
270  hashedIndex[0] = 0;
271  for (int i = 1; i < nbClient; ++i)
272  {
273    nbHashIndex = nbHashIndexMax / nbClient;
274    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
275    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
276  }
277  hashedIndex[nbClient] = nbHashIndexMax;
278}
279
280/*!
281  Compute distribution of global index for servers
282  Each client already holds a piece of information and its attached index.
283This information will be redistributed among processes by projecting indices into size_t space.
284After the redistribution, each client holds rearranged index and its corresponding information.
285  \param [in] indexInfoMap index and its corresponding info (usually server index)
286  \param [in] commLevel communicator of current level
287  \param [in] level current level
288*/
289template<typename T, typename H>
290void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
291                                                          const MPI_Comm& commLevel,
292                                                          int level)
293{
294  int nbClient, clientRank;
295  MPI_Comm_size(commLevel,&nbClient);
296  MPI_Comm_rank(commLevel,&clientRank);
297  std::vector<size_t> hashedIndex;
298  computeHashIndex(hashedIndex, nbClient);
299
300  int* sendBuff = new int[nbClient];
301  int* sendNbIndexBuff = new int[nbClient];
302  for (int i = 0; i < nbClient; ++i)
303  {
304    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
305  }
306
307  // Compute size of sending and receving buffer
308  std::map<int, std::vector<size_t> > client2ClientIndex;
309  std::map<int, std::vector<InfoType> > client2ClientInfo;
310
311  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
312                                      iteClientHash = hashedIndex.end();
313  typename boost::unordered_map<size_t,InfoType>::const_iterator it  = indexInfoMap.begin(),
314                                                                 ite = indexInfoMap.end();
315  HashXIOS<size_t> hashGlobalIndex;
316  for (; it != ite; ++it)
317  {
318    size_t hashIndex = hashGlobalIndex(it->first);
319    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
320    if (itClientHash != iteClientHash)
321    {
322      int indexClient = std::distance(itbClientHash, itClientHash)-1;
323      {
324        sendBuff[indexClient] = 1;
325        ++sendNbIndexBuff[indexClient];
326        client2ClientIndex[indexClient].push_back(it->first);
327        client2ClientInfo[indexClient].push_back(it->second);
328      }
329    }
330  }
331
332  // Calculate from how many clients each client receive message.
333  int* recvBuff = new int[nbClient];
334  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
335  int recvNbClient = recvBuff[clientRank];
336
337  // Calculate size of buffer for receiving message
338  int* recvNbIndexBuff = new int[nbClient];
339  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
340  int recvNbIndexCount = recvNbIndexBuff[clientRank];
341  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
342  InfoType* recvInfoBuff = new InfoType[recvNbIndexCount];
343
344  // If a client holds information about index and the corresponding which don't belong to it,
345  // it will send a message to the correct clients.
346  // Contents of the message are index and its corresponding informatioin
347  std::list<MPI_Request> sendRequest;
348  std::map<int, std::vector<size_t> >::iterator itIndex  = client2ClientIndex.begin(),
349                                                iteIndex = client2ClientIndex.end();
350  for (; itIndex != iteIndex; ++itIndex)
351    sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest);
352  typename std::map<int, std::vector<InfoType> >::iterator itInfo  = client2ClientInfo.begin(),
353                                                           iteInfo = client2ClientInfo.end();
354  for (; itInfo != iteInfo; ++itInfo)
355    sendInfoToClients(itInfo->first, itInfo->second, commLevel, sendRequest);
356
357  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo;
358  std::map<int, int> countBuffInfo, countBuffIndex;
359  std::vector<int> processedList;
360
361  bool isFinished = (0 == recvNbClient) ? true : false;
362
363  // Counting of buffer for receiving global index
364  int countIndex = 0;
365
366  // Counting of buffer for receiving server index
367  int countInfo = 0;
368
369  // Request returned by MPI_IRecv function about global index
370  std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo;
371
372  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
373  std::map<int, unsigned long*> indexBuffBegin;
374
375  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
376  std::map<int, int*> infoBuffBegin;
377
378  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
379
380  // Now each client trys to listen to demand from others.
381  // If they have message, it processes: pushing global index and corresponding server to its map
382  while (!isFinished || (!sendRequest.empty()))
383  {
384    testSendRequest(sendRequest);
385    probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
386                                 countIndex, indexBuffBegin,
387                                 requestRecvIndex, commLevel);
388    // Processing complete request
389    for (itRequestIndex = requestRecvIndex.begin();
390         itRequestIndex != requestRecvIndex.end();
391         ++itRequestIndex)
392    {
393      int rank = itRequestIndex->first;
394      int count = computeBuffCountIndex(itRequestIndex->second);
395      if (0 != count)
396        countBuffIndex[rank] = count;
397    }
398
399    probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
400                                countInfo, infoBuffBegin,
401                                requestRecvInfo, commLevel);
402    for (itRequestInfo = requestRecvInfo.begin();
403         itRequestInfo != requestRecvInfo.end();
404         ++itRequestInfo)
405    {
406      int rank = itRequestInfo->first;
407      int count = computeBuffCountInfo(itRequestInfo->second);
408      if (0 != count)
409        countBuffInfo[rank] = count;
410    }
411
412    for (std::map<int, int>::iterator it = countBuffIndex.begin();
413                                      it != countBuffIndex.end(); ++it)
414    {
415      int rank = it->first;
416      if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
417          (countBuffIndex.end() != countBuffIndex.find(rank)))
418      {
419        int count = it->second;
420        for (int i = 0; i < count; ++i)
421           indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),*(infoBuffBegin[rank]+i)));
422        processedList.push_back(rank);
423        --recvNbClient;
424      }
425    }
426
427    for (int i = 0; i < processedList.size(); ++i)
428    {
429      requestRecvInfo.erase(processedList[i]);
430      requestRecvIndex.erase(processedList[i]);
431      countBuffIndex.erase(processedList[i]);
432      countBuffInfo.erase(processedList[i]);
433    }
434
435    if (0 == recvNbClient) isFinished = true;
436  }
437
438  delete [] sendBuff;
439  delete [] sendNbIndexBuff;
440  delete [] recvBuff;
441  delete [] recvNbIndexBuff;
442  delete [] recvIndexBuff;
443  delete [] recvInfoBuff;
444
445  // Ok, now do something recursive
446  if (0 < level)
447  {
448    --level;
449    computeDistributedIndex(indexToInfoMapping, this->commLevel_[level], level);
450  }
451  else
452    index2InfoMapping_ = indexToInfoMapping;
453}
454
455/*!
456  Probe and receive message containg global index from other clients.
457  Each client can send a message of global index to other clients to fulfill their maps.
458Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
459  \param [in] recvIndexBuff buffer dedicated for receiving global index
460  \param [in] recvNbIndexCount size of the buffer
461  \param [in] countIndex number of received index
462  \param [in] indexBuffBegin beginning of index buffer for each source rank
463  \param [in] requestRecvIndex request of receving index
464  \param [in] intraComm communicator
465*/
466template<typename T, typename H>
467void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
468                                                    const int recvNbIndexCount,
469                                                    int& countIndex,
470                                                    std::map<int, unsigned long*>& indexBuffBegin,
471                                                    std::map<int, MPI_Request>& requestRecvIndex,
472                                                    const MPI_Comm& intraComm)
473{
474  MPI_Status statusIndexGlobal;
475  int flagIndexGlobal, count;
476
477  // Probing for global index
478  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
479  if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
480  {
481    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
482    indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
483    MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
484              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
485              &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
486    countIndex += count;
487  }
488}
489
490/*!
491  Probe and receive message containg server index from other clients.
492  Each client can send a message of server index to other clients to fulfill their maps.
493Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
494  \param [in] recvInfoBuff buffer dedicated for receiving server index
495  \param [in] recvNbIndexCount size of the buffer
496  \param [in] countInfo number of received info
497  \param [in] infoBuffBegin beginning of index buffer for each source rank
498  \param [in] requestRecvInfo request of receving index
499  \param [in] intraComm communicator
500*/
501template<typename T, typename H>
502void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(T* recvInfoBuff,
503                                                   const int recvNbIndexCount,
504                                                   int& countInfo,
505                                                   std::map<int, T*>& infoBuffBegin,
506                                                   std::map<int, MPI_Request>& requestRecvInfo,
507                                                   const MPI_Comm& intraComm)
508{
509  MPI_Status statusInfo;
510  int flagInfo, count;
511
512  // Probing for server index
513  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
514  if ((true == flagInfo) && (countInfo < recvNbIndexCount))
515  {
516    MPI_Get_count(&statusInfo, MPI_CHAR, &count);
517    infoBuffBegin.insert(std::make_pair<int, T*>(statusInfo.MPI_SOURCE, recvInfoBuff+countInfo));
518    MPI_Irecv(recvInfoBuff+countInfo, count, MPI_CHAR,
519              statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
520              &requestRecvInfo[statusInfo.MPI_SOURCE]);
521
522    countInfo += count/infoTypeSize;
523  }
524}
525
526/*!
527  Send message containing index to clients
528  \param [in] clientDestRank rank of destination client
529  \param [in] indices index to send
530  \param [in] clientIntraComm communication group of client
531  \param [in] requestSendIndex list of sending request
532*/
533template<typename T, typename H>
534void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, std::vector<size_t>& indices,
535                                          const MPI_Comm& clientIntraComm,
536                                          std::list<MPI_Request>& requestSendIndex)
537{
538  MPI_Request request;
539  requestSendIndex.push_back(request);
540  MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG,
541            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
542}
543
544/*!
545  Send message containing information to clients
546  \param [in] clientDestRank rank of destination client
547  \param [in] info server index to send
548  \param [in] clientIntraComm communication group of client
549  \param [in] requestSendInfo list of sending request
550*/
551template<typename T, typename H>
552void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, std::vector<T>& info,
553                                         const MPI_Comm& clientIntraComm,
554                                         std::list<MPI_Request>& requestSendInfo)
555{
556  MPI_Request request;
557  requestSendInfo.push_back(request);
558
559  MPI_Isend(&(info)[0], info.size() * infoTypeSize, MPI_CHAR,
560            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
561}
562
563/*!
564  Verify status of sending request
565  \param [in] sendRequest sending request to verify
566*/
567template<typename T, typename H>
568void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest)
569{
570  int flag = 0;
571  MPI_Status status;
572  std::list<MPI_Request>::iterator itRequest;
573  int sizeListRequest = sendRequest.size();
574  int idx = 0;
575  while (idx < sizeListRequest)
576  {
577    bool isErased = false;
578    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
579    {
580      MPI_Test(&(*itRequest), &flag, &status);
581      if (true == flag)
582      {
583        isErased = true;
584        break;
585      }
586    }
587    if (true == isErased) sendRequest.erase(itRequest);
588    ++idx;
589  }
590}
591
592/*!
593  Compute size of message containing global index
594  \param[in] requestRecv request of message
595*/
596template<typename T, typename H>
597int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv)
598{
599  int flag, count = 0;
600  MPI_Status status;
601
602  MPI_Test(&requestRecv, &flag, &status);
603  if (true == flag)
604  {
605    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
606  }
607
608  return count;
609}
610
611/*!
612  Compute size of message containing server index
613  \param[in] requestRecv request of message
614*/
615template<typename T, typename H>
616int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv)
617{
618  int flag, count = 0;
619  MPI_Status status;
620
621  MPI_Test(&requestRecv, &flag, &status);
622  if (true == flag)
623  {
624    MPI_Get_count(&status, MPI_CHAR, &count);
625  }
626
627  return (count/infoTypeSize);
628}
629
630}
Note: See TracBrowser for help on using the repository browser.