source: XIOS/trunk/src/client_client_dht_template_impl.hpp @ 829

Last change on this file since 829 was 829, checked in by mhnguyen, 8 years ago

Refactoring transformation code

+) On exchanging information during transformation, not only global index are sent but also local index
+) Correct a bug in distributed hash table (dht)
+) Add new type for dht
+) Clean up some redundant codes

Test
+) On Curie
+) Every test passes
+) Code runs faster in some cases (up to 30%)

File size: 24.9 KB
Line 
1/*!
2   \file client_client_dht_template_impl.hpp
3   \author Ha NGUYEN
4   \since 05 Oct 2015
5   \date 05 Oct 2015
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht_template.hpp"
10#include "utils.hpp"
11#include "mpi_tag.hpp"
12
13namespace xios
14{
15/*!
16  Constructor with initial distribution information and the corresponding index
17  Each client (process) holds a piece of information as well as the attached index, the index
18will be redistributed (projected) into size_t space as long as the associated information.
19  \param [in] indexInfoMap initial index and information mapping
20  \param [in] clientIntraComm communicator of clients
21  \param [in] hierarLvl level of hierarchy
22*/
23template<typename T, typename H>
24CClientClientDHTTemplate<T,H>::CClientClientDHTTemplate(const boost::unordered_map<size_t,T>& indexInfoMap,
25                                                      const MPI_Comm& clientIntraComm,
26                                                      int hierarLvl)
27  : index2InfoMapping_(), indexToInfoMappingLevel_()
28{
29  this->computeMPICommLevel(clientIntraComm, hierarLvl);
30  int lvl = this->commLevel_.size() - 1;
31  computeDistributedIndex(indexInfoMap, this->commLevel_[lvl], lvl);
32}
33
34template<typename T, typename H>
35CClientClientDHTTemplate<T,H>::~CClientClientDHTTemplate()
36{
37}
38
39/*!
40  Compute mapping between indices and information corresponding to these indices
41  \param [in] indices indices a proc has
42*/
43template<typename T, typename H>
44void CClientClientDHTTemplate<T,H>::computeIndexInfoMapping(const CArray<size_t,1>& indices)
45{
46  int lvl = this->commLevel_.size() - 1;
47  computeIndexInfoMappingLevel(indices, this->commLevel_[lvl], lvl);
48  size_t size = indices.numElements();
49  for (size_t idx = 0; idx < size; ++idx)
50  {
51    T& info = indexToInfoMappingLevel_[indices(idx)];
52//    infoIndexMapping_[info].push_back(indices(idx));
53    infoIndexMapping_[indices(idx)] = info;
54  }
55}
56
57/*!
58    Compute mapping between indices and information corresponding to these indices
59for each level of hierarchical DHT. Recursive function
60   \param [in] indices indices a proc has
61   \param [in] commLevel communicator of current level
62   \param [in] level current level
63*/
64template<typename T, typename H>
65void CClientClientDHTTemplate<T,H>::computeIndexInfoMappingLevel(const CArray<size_t,1>& indices,
66                                                                 const MPI_Comm& commLevel,
67                                                                 int level)
68{
69  int nbClient, clientRank;
70  MPI_Comm_size(commLevel,&nbClient);
71  MPI_Comm_rank(commLevel,&clientRank);
72  std::vector<size_t> hashedIndex;
73  computeHashIndex(hashedIndex, nbClient);
74
75  size_t ssize = indices.numElements(), hashedVal;
76
77  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
78                                      iteClientHash = hashedIndex.end();
79  std::map<int, std::vector<size_t> > client2ClientIndex;
80
81  // Number of global index whose mapping server are on other clients
82  int nbIndexToSend = 0;
83  HashXIOS<size_t> hashGlobalIndex;
84  for (int i = 0; i < ssize; ++i)
85  {
86    size_t index = indices(i);
87    hashedVal  = hashGlobalIndex(index);
88    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
89    if (iteClientHash != itClientHash)
90    {
91      int indexClient = std::distance(itbClientHash, itClientHash)-1;
92      {
93        client2ClientIndex[indexClient].push_back(index);
94        ++nbIndexToSend;
95      }
96    }
97  }
98
99  int* sendBuff = new int[nbClient];
100  for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0;
101  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndex.begin(), it,
102                                                ite  = client2ClientIndex.end();
103  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
104  int* recvBuff = new int[nbClient];
105  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
106
107  std::list<MPI_Request> sendIndexRequest;
108  if (0 != nbIndexToSend)
109      for (it = itb; it != ite; ++it)
110         sendIndexToClients(it->first, it->second, commLevel, sendIndexRequest);
111
112  int nbDemandingClient = recvBuff[clientRank], nbSendBuffInfoReceived = 0;
113
114  // Receiving demand as well as the responds from other clients
115  // The demand message contains global index; meanwhile the responds have server index information
116  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
117  // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
118  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
119  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
120
121  unsigned long* recvBuffIndex = 0;
122  int maxNbIndexDemandedFromOthers = recvBuff[clientRank];
123
124  if (0 != maxNbIndexDemandedFromOthers)
125    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
126
127  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
128//  InfoType* recvBuffInfo = 0;
129  unsigned char* recvBuffInfo = 0;
130  int nbIndexReceivedFromOthers = nbIndexToSend;
131  if (0 != nbIndexReceivedFromOthers)
132    recvBuffInfo = new unsigned char[nbIndexReceivedFromOthers*infoTypeSize];
133
134  std::map<int, MPI_Request>::iterator itRequest;
135  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
136
137
138  int countIndex = 0;  // Counting of buffer for receiving index
139  std::map<int, MPI_Request> requestRecvIndex; // Request returned by MPI_IRecv function about index
140
141  // Mapping client rank and the beginning position of receiving buffer for message of index from this client
142  std::map<int, unsigned long*> indexBuffBegin;
143
144  std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
145
146  CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
147
148  int k = 0;
149  while ((0 < nbDemandingClient) || (!sendIndexRequest.empty()))
150  {
151    // Just check whether a client has any demand from other clients.
152    // If it has, then it should send responds to these client(s)
153    probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
154                                 countIndex, indexBuffBegin,
155                                 requestRecvIndex, commLevel);
156    if (0 < nbDemandingClient)
157    {
158      for (itRequest = requestRecvIndex.begin();
159           itRequest != requestRecvIndex.end(); ++itRequest)
160      {
161        int flagIndexGlobal, count;
162        MPI_Status statusIndexGlobal;
163
164        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
165        if (true == flagIndexGlobal)
166        {
167          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
168          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
169          unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
170          for (int i = 0; i < count; ++i)
171          {
172            src2Index[clientSourceRank].push_back(*(beginBuff+i));
173            tmpGlobalIndexOnClient(k) = *(beginBuff+i);
174            ++k;
175          }
176          --nbDemandingClient;
177
178          demandAlreadyReceived.push_back(clientSourceRank);
179        }
180      }
181      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
182        requestRecvIndex.erase(demandAlreadyReceived[i]);
183    }
184
185    testSendRequest(sendIndexRequest);
186  }
187
188  if (0 < level)
189  {
190    --level;
191    computeIndexInfoMappingLevel(tmpGlobalIndexOnClient, this->commLevel_[level], level);
192  }
193  else
194    indexToInfoMappingLevel_ = index2InfoMapping_;
195
196  std::map<int, std::vector<InfoType> > client2ClientInfo;
197  std::list<MPI_Request> sendInfoRequest;
198  std::map<int, std::vector<size_t> >::iterator itbSrc2Idx = src2Index.begin(), itSrc2Idx,
199                                                iteSrc2Idx = src2Index.end();
200  for (itSrc2Idx = itbSrc2Idx; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx)
201  {
202    int clientSourceRank = itSrc2Idx->first;
203    std::vector<size_t>& srcIdx = itSrc2Idx->second;
204    for (int idx = 0; idx < srcIdx.size(); ++idx)
205    {
206      client2ClientInfo[clientSourceRank].push_back(indexToInfoMappingLevel_[srcIdx[idx]]);
207    }
208    sendInfoToClients(clientSourceRank, client2ClientInfo[clientSourceRank], commLevel, sendInfoRequest);
209  }
210
211  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
212  int countInfo = 0; // Counting of buffer for receiving server index
213  std::map<int, MPI_Request> requestRecvInfo;
214
215  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
216  std::map<int, unsigned char*> infoBuffBegin;
217
218  while ((!sendInfoRequest.empty()) || (nbSendBuffInfoReceived < nbIndexReceivedFromOthers))
219  {
220    testSendRequest(sendInfoRequest);
221
222    // In some cases, a client need to listen respond from other clients about server information
223    // Ok, with the information, a client can fill in its server-global index map.
224    probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
225                                countInfo, infoBuffBegin,
226                                requestRecvInfo, commLevel);
227    for (itRequest = requestRecvInfo.begin();
228         itRequest != requestRecvInfo.end();
229         ++itRequest)
230    {
231      int flagInfo, count;
232      MPI_Status statusInfo;
233
234      MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
235      if (true == flagInfo)
236      {
237        MPI_Get_count(&statusInfo, MPI_CHAR, &count);
238        int actualCountInfo = count/infoTypeSize;
239        int clientSourceRank = statusInfo.MPI_SOURCE;
240        unsigned char* beginBuff = infoBuffBegin[clientSourceRank];
241        std::vector<size_t>& indexTmp = client2ClientIndex[clientSourceRank];
242        TypeToBytes<InfoType> u;
243        for (int i = 0; i < actualCountInfo; ++i)
244        {
245          unsigned char* tmpBeginBuff = beginBuff+i*infoTypeSize;
246          for (size_t idx = 0; idx < infoTypeSize; ++idx) u.bytes[idx] = *(tmpBeginBuff+idx);
247          indexToInfoMapping[indexTmp[i]] = u.value;
248        }
249        nbSendBuffInfoReceived += actualCountInfo;
250        repondAlreadyReceived.push_back(clientSourceRank);
251      }
252    }
253
254    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
255      requestRecvInfo.erase(repondAlreadyReceived[i]);
256    repondAlreadyReceived.resize(0);
257  }
258
259  indexToInfoMappingLevel_.swap(indexToInfoMapping);
260  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
261  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
262  delete [] sendBuff;
263  delete [] recvBuff;
264}
265
266/*!
267  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
268*/
269template<typename T, typename H>
270void CClientClientDHTTemplate<T,H>::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
271{
272  // Compute range of hash index for each client
273  hashedIndex.resize(nbClient+1);
274  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
275  size_t nbHashIndex;
276  hashedIndex[0] = 0;
277  for (int i = 1; i < nbClient; ++i)
278  {
279    nbHashIndex = nbHashIndexMax / nbClient;
280    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
281    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
282  }
283  hashedIndex[nbClient] = nbHashIndexMax;
284}
285
286/*!
287  Compute distribution of global index for servers
288  Each client already holds a piece of information and its attached index.
289This information will be redistributed among processes by projecting indices into size_t space.
290After the redistribution, each client holds rearranged index and its corresponding information.
291  \param [in] indexInfoMap index and its corresponding info (usually server index)
292  \param [in] commLevel communicator of current level
293  \param [in] level current level
294*/
295template<typename T, typename H>
296void CClientClientDHTTemplate<T,H>::computeDistributedIndex(const boost::unordered_map<size_t,T>& indexInfoMap,
297                                                            const MPI_Comm& commLevel,
298                                                            int level)
299{
300  int nbClient, clientRank;
301  MPI_Comm_size(commLevel,&nbClient);
302  MPI_Comm_rank(commLevel,&clientRank);
303  std::vector<size_t> hashedIndex;
304  computeHashIndex(hashedIndex, nbClient);
305
306  int* sendBuff = new int[nbClient];
307  int* sendNbIndexBuff = new int[nbClient];
308  for (int i = 0; i < nbClient; ++i)
309  {
310    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
311  }
312
313  // Compute size of sending and receving buffer
314  std::map<int, std::vector<size_t> > client2ClientIndex;
315  std::map<int, std::vector<InfoType> > client2ClientInfo;
316
317  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
318                                      iteClientHash = hashedIndex.end();
319  typename boost::unordered_map<size_t,InfoType>::const_iterator it  = indexInfoMap.begin(),
320                                                                 ite = indexInfoMap.end();
321  HashXIOS<size_t> hashGlobalIndex;
322  for (; it != ite; ++it)
323  {
324    size_t hashIndex = hashGlobalIndex(it->first);
325    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
326    if (itClientHash != iteClientHash)
327    {
328      int indexClient = std::distance(itbClientHash, itClientHash)-1;
329      {
330        sendBuff[indexClient] = 1;
331        ++sendNbIndexBuff[indexClient];
332        client2ClientIndex[indexClient].push_back(it->first);
333        client2ClientInfo[indexClient].push_back(it->second);
334      }
335    }
336  }
337
338  // Calculate from how many clients each client receive message.
339  int* recvBuff = new int[nbClient];
340  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
341  int recvNbClient = recvBuff[clientRank];
342
343  // Calculate size of buffer for receiving message
344  int* recvNbIndexBuff = new int[nbClient];
345  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
346  int recvNbIndexCount = recvNbIndexBuff[clientRank];
347  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
348  unsigned char* recvInfoBuff = new unsigned char[recvNbIndexCount*infoTypeSize];
349
350  // If a client holds information about index and the corresponding which don't belong to it,
351  // it will send a message to the correct clients.
352  // Contents of the message are index and its corresponding informatioin
353  std::list<MPI_Request> sendRequest;
354  std::map<int, std::vector<size_t> >::iterator itIndex  = client2ClientIndex.begin(),
355                                                iteIndex = client2ClientIndex.end();
356  for (; itIndex != iteIndex; ++itIndex)
357    sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest);
358  typename std::map<int, std::vector<InfoType> >::iterator itInfo  = client2ClientInfo.begin(),
359                                                           iteInfo = client2ClientInfo.end();
360  for (; itInfo != iteInfo; ++itInfo)
361    sendInfoToClients(itInfo->first, itInfo->second, commLevel, sendRequest);
362
363  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo;
364  std::map<int, int> countBuffInfo, countBuffIndex;
365  std::vector<int> processedList;
366
367  bool isFinished = (0 == recvNbClient) ? true : false;
368
369  // Counting of buffer for receiving global index
370  int countIndex = 0;
371
372  // Counting of buffer for receiving server index
373  int countInfo = 0;
374
375  // Request returned by MPI_IRecv function about global index
376  std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo;
377
378  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
379  std::map<int, unsigned long*> indexBuffBegin;
380
381  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
382  std::map<int, unsigned char*> infoBuffBegin;
383
384  boost::unordered_map<size_t,InfoType> indexToInfoMapping;
385
386  // Now each client trys to listen to demand from others.
387  // If they have message, it processes: pushing global index and corresponding server to its map
388  while (!isFinished || (!sendRequest.empty()))
389  {
390    testSendRequest(sendRequest);
391    probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
392                                 countIndex, indexBuffBegin,
393                                 requestRecvIndex, commLevel);
394    // Processing complete request
395    for (itRequestIndex = requestRecvIndex.begin();
396         itRequestIndex != requestRecvIndex.end();
397         ++itRequestIndex)
398    {
399      int rank = itRequestIndex->first;
400      int count = computeBuffCountIndex(itRequestIndex->second);
401      if (0 != count)
402        countBuffIndex[rank] = count;
403    }
404
405    probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
406                                countInfo, infoBuffBegin,
407                                requestRecvInfo, commLevel);
408    for (itRequestInfo = requestRecvInfo.begin();
409         itRequestInfo != requestRecvInfo.end();
410         ++itRequestInfo)
411    {
412      int rank = itRequestInfo->first;
413      int count = computeBuffCountInfo(itRequestInfo->second);
414      if (0 != count)
415        countBuffInfo[rank] = count;
416    }
417
418    for (std::map<int, int>::iterator it = countBuffIndex.begin();
419                                      it != countBuffIndex.end(); ++it)
420    {
421      int rank = it->first;
422      if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
423          (countBuffIndex.end() != countBuffIndex.find(rank)))
424      {
425        int count = it->second;
426        TypeToBytes<InfoType> u;
427        for (int i = 0; i < count; ++i)
428        {
429          unsigned char* tmpBeginBuff = infoBuffBegin[rank]+i*infoTypeSize;
430          for (size_t idx = 0; idx < infoTypeSize; ++idx) u.bytes[idx] = *(tmpBeginBuff+idx);
431          indexToInfoMapping.insert(std::make_pair<size_t,InfoType>(*(indexBuffBegin[rank]+i),u.value));
432        }
433
434        processedList.push_back(rank);
435        --recvNbClient;
436      }
437    }
438
439    for (int i = 0; i < processedList.size(); ++i)
440    {
441      requestRecvInfo.erase(processedList[i]);
442      requestRecvIndex.erase(processedList[i]);
443      countBuffIndex.erase(processedList[i]);
444      countBuffInfo.erase(processedList[i]);
445    }
446
447    if (0 == recvNbClient) isFinished = true;
448  }
449
450  delete [] sendBuff;
451  delete [] sendNbIndexBuff;
452  delete [] recvBuff;
453  delete [] recvNbIndexBuff;
454  delete [] recvIndexBuff;
455  delete [] recvInfoBuff;
456
457  // Ok, now do something recursive
458  if (0 < level)
459  {
460    --level;
461    computeDistributedIndex(indexToInfoMapping, this->commLevel_[level], level);
462  }
463  else
464    index2InfoMapping_ = indexToInfoMapping;
465}
466
467/*!
468  Probe and receive message containg global index from other clients.
469  Each client can send a message of global index to other clients to fulfill their maps.
470Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
471  \param [in] recvIndexBuff buffer dedicated for receiving global index
472  \param [in] recvNbIndexCount size of the buffer
473  \param [in] countIndex number of received index
474  \param [in] indexBuffBegin beginning of index buffer for each source rank
475  \param [in] requestRecvIndex request of receving index
476  \param [in] intraComm communicator
477*/
478template<typename T, typename H>
479void CClientClientDHTTemplate<T,H>::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
480                                                                 const int recvNbIndexCount,
481                                                                 int& countIndex,
482                                                                 std::map<int, unsigned long*>& indexBuffBegin,
483                                                                 std::map<int, MPI_Request>& requestRecvIndex,
484                                                                 const MPI_Comm& intraComm)
485{
486  MPI_Status statusIndexGlobal;
487  int flagIndexGlobal, count;
488
489  // Probing for global index
490  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
491  if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
492  {
493    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
494    indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
495    MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
496              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
497              &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
498    countIndex += count;
499  }
500}
501
502/*!
503  Probe and receive message containg server index from other clients.
504  Each client can send a message of server index to other clients to fulfill their maps.
505Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
506  \param [in] recvInfoBuff buffer dedicated for receiving server index
507  \param [in] recvNbIndexCount size of the buffer
508  \param [in] countInfo number of received info
509  \param [in] infoBuffBegin beginning of index buffer for each source rank
510  \param [in] requestRecvInfo request of receving index
511  \param [in] intraComm communicator
512*/
513template<typename T, typename H>
514void CClientClientDHTTemplate<T,H>::probeInfoMessageFromClients(unsigned char* recvInfoBuff,
515                                                                const int recvNbIndexCount,
516                                                                int& countInfo,
517                                                                std::map<int, unsigned char*>& infoBuffBegin,
518                                                                std::map<int, MPI_Request>& requestRecvInfo,
519                                                                const MPI_Comm& intraComm)
520{
521  MPI_Status statusInfo;
522  int flagInfo, count;
523
524  // Probing for server index
525  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
526  if ((true == flagInfo) && (countInfo < recvNbIndexCount))
527  {
528    MPI_Get_count(&statusInfo, MPI_CHAR, &count);
529    unsigned char* beginInfoBuff = recvInfoBuff+countInfo*infoTypeSize;
530    infoBuffBegin.insert(std::make_pair<int, unsigned char*>(statusInfo.MPI_SOURCE, beginInfoBuff));
531    MPI_Irecv(beginInfoBuff, count, MPI_CHAR,
532              statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
533              &requestRecvInfo[statusInfo.MPI_SOURCE]);
534
535    countInfo += count/infoTypeSize;
536  }
537}
538
539/*!
540  Send message containing index to clients
541  \param [in] clientDestRank rank of destination client
542  \param [in] indices index to send
543  \param [in] clientIntraComm communication group of client
544  \param [in] requestSendIndex list of sending request
545*/
546template<typename T, typename H>
547void CClientClientDHTTemplate<T,H>::sendIndexToClients(int clientDestRank, std::vector<size_t>& indices,
548                                                       const MPI_Comm& clientIntraComm,
549                                                       std::list<MPI_Request>& requestSendIndex)
550{
551  MPI_Request request;
552  requestSendIndex.push_back(request);
553  MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG,
554            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
555}
556
557/*!
558  Send message containing information to clients
559  \param [in] clientDestRank rank of destination client
560  \param [in] info server index to send
561  \param [in] clientIntraComm communication group of client
562  \param [in] requestSendInfo list of sending request
563*/
564template<typename T, typename H>
565void CClientClientDHTTemplate<T,H>::sendInfoToClients(int clientDestRank, std::vector<T>& info,
566                                                      const MPI_Comm& clientIntraComm,
567                                                      std::list<MPI_Request>& requestSendInfo)
568{
569  MPI_Request request;
570  requestSendInfo.push_back(request);
571
572  MPI_Isend(&(info)[0], info.size() * infoTypeSize, MPI_CHAR,
573            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
574}
575
576/*!
577  Verify status of sending request
578  \param [in] sendRequest sending request to verify
579*/
580template<typename T, typename H>
581void CClientClientDHTTemplate<T,H>::testSendRequest(std::list<MPI_Request>& sendRequest)
582{
583  int flag = 0;
584  MPI_Status status;
585  std::list<MPI_Request>::iterator itRequest;
586  int sizeListRequest = sendRequest.size();
587  int idx = 0;
588  while (idx < sizeListRequest)
589  {
590    bool isErased = false;
591    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
592    {
593      MPI_Test(&(*itRequest), &flag, &status);
594      if (true == flag)
595      {
596        isErased = true;
597        break;
598      }
599    }
600    if (true == isErased) sendRequest.erase(itRequest);
601    ++idx;
602  }
603}
604
605/*!
606  Compute size of message containing global index
607  \param[in] requestRecv request of message
608*/
609template<typename T, typename H>
610int CClientClientDHTTemplate<T,H>::computeBuffCountIndex(MPI_Request& requestRecv)
611{
612  int flag, count = 0;
613  MPI_Status status;
614
615  MPI_Test(&requestRecv, &flag, &status);
616  if (true == flag)
617  {
618    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
619  }
620
621  return count;
622}
623
624/*!
625  Compute size of message containing server index
626  \param[in] requestRecv request of message
627*/
628template<typename T, typename H>
629int CClientClientDHTTemplate<T,H>::computeBuffCountInfo(MPI_Request& requestRecv)
630{
631  int flag, count = 0;
632  MPI_Status status;
633
634  MPI_Test(&requestRecv, &flag, &status);
635  if (true == flag)
636  {
637    MPI_Get_count(&status, MPI_CHAR, &count);
638  }
639
640  return (count/infoTypeSize);
641}
642
643}
Note: See TracBrowser for help on using the repository browser.