source: XIOS/trunk/src/client_client_dht.cpp @ 720

Last change on this file since 720 was 720, checked in by mhnguyen, 8 years ago

First implementation of hierarchical distributed hashed table

+) Implement dht for int with index of type size_t

Test
+) Local
+) Work correctly

File size: 25.2 KB
Line 
1/*!
2   \file client_client_dht.cpp
3   \author Ha NGUYEN
4   \since 15 Sep 2015
5   \date 15 Sep 2015
6
7   \brief Distributed hashed table implementation.
8 */
9#include "client_client_dht.hpp"
10#include <limits>
11#include <cmath>
12#include <boost/functional/hash.hpp>
13#include "utils.hpp"
14#include "mpi_tag.hpp"
15
16namespace xios
17{
18
19CClientClientDHT::CClientClientDHT(const boost::unordered_map<size_t,int>& indexInfoMap,
20                                   const MPI_Comm& clientIntraComm, bool isDataDistributed, int hierarLvl)
21  : intraCommRoot_(clientIntraComm), commLevel_(), isDataDistributed_(isDataDistributed),
22    nbLevel_(hierarLvl), globalIndexToServerMapping_(), globalIndexToInfoMappingLevel_()
23{
24  computeMPICommLevel(clientIntraComm);
25  int lvl = commLevel_.size() - 1;
26  computeDistributedIndex(indexInfoMap, commLevel_[lvl], lvl);
27}
28
29CClientClientDHT::~CClientClientDHT()
30{
31}
32
33/*!
34  Calculate MPI communicator for each level of hierarchy.
35  \param[in] mpiCommRoot MPI communicator of the level 0 (usually communicator of all clients)
36*/
37void CClientClientDHT::computeMPICommLevel(const MPI_Comm& mpiCommRoot)
38{
39  int nbProc;
40  MPI_Comm_size(mpiCommRoot,&nbProc);
41  if (nbLevel_ > nbProc) nbLevel_ = std::log(nbProc);
42  else if (1 > nbLevel_) nbLevel_ = 1;
43
44  commLevel_.push_back(mpiCommRoot);
45  divideMPICommLevel(mpiCommRoot, nbLevel_);
46}
47
48/*!
49  Divide each MPI communicator into sub-communicator. Recursive function
50  \param [in] mpiCommLevel MPI communicator of current level
51  \param [in] level current level
52*/
53void CClientClientDHT::divideMPICommLevel(const MPI_Comm& mpiCommLevel, int level)
54{
55  int clientRank;
56  MPI_Comm_rank(mpiCommLevel,&clientRank);
57
58   --level;
59  if (0 < level)
60  {
61   int color = clientRank % 2;
62   commLevel_.push_back(MPI_Comm());
63   MPI_Comm_split(mpiCommLevel, color, 0, &(commLevel_.back()));
64   divideMPICommLevel(commLevel_.back(), level);
65  }
66}
67
68/*!
69  Compute mapping between indices and information corresponding to these indices
70  \param [in] indices indices a proc has
71*/
72void CClientClientDHT::computeServerIndexMapping(const CArray<size_t,1>& indices)
73{
74  int lvl = commLevel_.size() - 1;
75  computeIndexMapping(indices, commLevel_[lvl], lvl);
76  size_t size = indices.numElements();
77  for (size_t idx = 0; idx < size; ++idx)
78  {
79    int serverIdx = globalIndexToInfoMappingLevel_[indices(idx)];
80    indexGlobalOnServer_[serverIdx].push_back(indices(idx));
81  }
82}
83
84/*!
85    Compute mapping between indices and information corresponding to these indices
86for each level of hierarchical DHT. Recursive function
87   \param [in] indices indices a proc has
88   \param [in] commLevel communicator of current level
89   \param [in] level current level
90*/
91void CClientClientDHT::computeIndexMapping(const CArray<size_t,1>& indices,
92                                           const MPI_Comm& commLevel,
93                                           int level)
94{
95  int nbClient, clientRank;
96  MPI_Comm_size(commLevel,&nbClient);
97  MPI_Comm_rank(commLevel,&clientRank);
98  std::vector<size_t> hashedIndex;
99  computeHashIndex(hashedIndex, nbClient);
100
101  size_t ssize = indices.numElements(), hashedVal;
102
103  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
104                                      iteClientHash = hashedIndex.end();
105  std::map<int, std::vector<size_t> > client2ClientIndex;
106
107  // Number of global index whose mapping server can be found out thanks to index-server mapping
108  int nbIndexAlreadyOnClient = 0;
109
110  // Number of global index whose mapping server are on other clients
111  int nbIndexSendToOthers = 0;
112  HashXIOS<size_t> hashGlobalIndex;
113  for (int i = 0; i < ssize; ++i)
114  {
115    size_t index = indices(i);
116    hashedVal  = hashGlobalIndex(index);
117    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedVal);
118    if (iteClientHash != itClientHash)
119    {
120      int indexClient = std::distance(itbClientHash, itClientHash)-1;
121      {
122        client2ClientIndex[indexClient].push_back(index);
123        ++nbIndexSendToOthers;
124      }
125    }
126  }
127  info << "level " << level << " nbIndexsendtoOther " << nbIndexSendToOthers << std::endl;
128
129  int* sendBuff = new int[nbClient];
130  for (int i = 0; i < nbClient; ++i) sendBuff[i] = 0;
131  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndex.begin(), it,
132                                                ite  = client2ClientIndex.end();
133  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
134  int* recvBuff = new int[nbClient];
135  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
136
137  std::list<MPI_Request> sendRequest;
138  if (0 != nbIndexSendToOthers)
139      for (it = itb; it != ite; ++it)
140         sendIndexToClients(it->first, it->second, commLevel, sendRequest);
141
142  int nbDemandingClient = recvBuff[clientRank], nbIndexServerReceived = 0;
143
144  // Receiving demand as well as the responds from other clients
145  // The demand message contains global index; meanwhile the responds have server index information
146  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
147    // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
148  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
149  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
150
151  unsigned long* recvBuffIndex = 0;
152  int maxNbIndexDemandedFromOthers = recvBuff[clientRank];
153//  if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general
154
155  if (0 != maxNbIndexDemandedFromOthers)
156    recvBuffIndex = new unsigned long[maxNbIndexDemandedFromOthers];
157
158  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
159  int* recvBuffInfo = 0;
160  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
161  if (0 != nbIndexReceivedFromOthers)
162    recvBuffInfo = new int[nbIndexReceivedFromOthers];
163
164  std::map<int, MPI_Request>::iterator itRequest;
165  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
166
167    // Counting of buffer for receiving global index
168  int countIndex = 0;
169
170  // Request returned by MPI_IRecv function about global index
171  std::map<int, MPI_Request> requestRecvIndex;
172
173  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
174  std::map<int, unsigned long*> indexBuffBegin;
175
176  std::map<int,std::vector<size_t> > src2Index; // Temporary mapping contains info of demand (source and associate index) in curren level
177
178  CArray<size_t,1> tmpGlobalIndexOnClient(maxNbIndexDemandedFromOthers);
179  int k = 0;
180
181  while ((0 < nbDemandingClient) || (!sendRequest.empty()))
182  {
183    // Just check whether a client has any demand from other clients.
184    // If it has, then it should send responds to these client(s)
185    probeIndexMessageFromClients(recvBuffIndex, maxNbIndexDemandedFromOthers,
186                                 countIndex, indexBuffBegin,
187                                 requestRecvIndex, commLevel);
188    if (0 < nbDemandingClient)
189    {
190      for (itRequest = requestRecvIndex.begin();
191           itRequest != requestRecvIndex.end(); ++itRequest)
192      {
193        int flagIndexGlobal, count;
194        MPI_Status statusIndexGlobal;
195
196        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
197        if (true == flagIndexGlobal)
198        {
199          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
200          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
201          unsigned long* beginBuff = indexBuffBegin[clientSourceRank];
202          for (int i = 0; i < count; ++i)
203          {
204            src2Index[clientSourceRank].push_back(*(beginBuff+i));
205            tmpGlobalIndexOnClient(k) = *(beginBuff+i);
206            ++k;
207          }
208          --nbDemandingClient;
209
210          demandAlreadyReceived.push_back(clientSourceRank);
211        }
212      }
213      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
214        requestRecvIndex.erase(demandAlreadyReceived[i]);
215    }
216
217    testSendRequest(sendRequest);
218  }
219
220  if (0 < level)
221  {
222    --level;
223    computeIndexMapping(tmpGlobalIndexOnClient, commLevel_[level], level);
224  }
225  else
226    globalIndexToInfoMappingLevel_ = globalIndexToServerMapping_;
227
228  std::map<int, std::vector<int> > client2ClientInfo;
229  std::list<MPI_Request> sendInfoRequest;
230  std::map<int, std::vector<size_t> >::iterator itbSrc2Idx = src2Index.begin(), itSrc2Idx,
231                                                iteSrc2Idx = src2Index.end();
232  for (itSrc2Idx = itbSrc2Idx; itSrc2Idx != iteSrc2Idx; ++itSrc2Idx)
233  {
234    int clientSourceRank = itSrc2Idx->first;
235    std::vector<size_t>& srcIdx = itSrc2Idx->second;
236    for (int idx = 0; idx < srcIdx.size(); ++idx)
237    {
238//      client2ClientInfo[clientSourceRank].push_back(globalIndexToServerMapping_[srcIdx[idx]]);
239      client2ClientInfo[clientSourceRank].push_back(globalIndexToInfoMappingLevel_[srcIdx[idx]]);
240    }
241    sendInfoToClients(clientSourceRank, client2ClientInfo[clientSourceRank], commLevel, sendInfoRequest);
242  }
243
244  boost::unordered_map<size_t,int> indexToInfoMapping;
245
246  // Counting of buffer for receiving server index
247  int countInfo = 0;
248
249  std::map<int, MPI_Request> requestRecvInfo;
250
251  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
252  std::map<int, int*> infoBuffBegin;
253
254  while ((!sendInfoRequest.empty()) || (nbIndexServerReceived < nbIndexReceivedFromOthers))
255  {
256    testSendRequest(sendInfoRequest);
257
258    // In some cases, a client need to listen respond from other clients about server information
259    // Ok, with the information, a client can fill in its server-global index map.
260    probeInfoMessageFromClients(recvBuffInfo, nbIndexReceivedFromOthers,
261                                countInfo, infoBuffBegin,
262                                requestRecvInfo, commLevel);
263    for (itRequest = requestRecvInfo.begin();
264         itRequest != requestRecvInfo.end();
265         ++itRequest)
266    {
267      int flagInfo, count;
268      MPI_Status statusInfo;
269
270      MPI_Test(&(itRequest->second), &flagInfo, &statusInfo);
271      if (true == flagInfo)
272      {
273        MPI_Get_count(&statusInfo, MPI_INT, &count);
274        int clientSourceRank = statusInfo.MPI_SOURCE;
275        int* beginBuff = infoBuffBegin[clientSourceRank];
276        std::vector<size_t>& globalIndexTmp = client2ClientIndex[clientSourceRank];
277        for (int i = 0; i < count; ++i)
278        {
279          indexToInfoMapping[globalIndexTmp[i]] = *(beginBuff+i);
280//          globalIndexToServerMapping_[globalIndexTmp[i]] = *(beginBuff+i);
281        }
282        nbIndexServerReceived += count;
283        repondAlreadyReceived.push_back(clientSourceRank);
284      }
285    }
286
287    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
288      requestRecvInfo.erase(repondAlreadyReceived[i]);
289    repondAlreadyReceived.resize(0);
290  }
291
292  globalIndexToInfoMappingLevel_ = indexToInfoMapping;
293  info << "temp " << tmpGlobalIndexOnClient << std::endl;
294  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndex;
295  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffInfo;
296  delete [] sendBuff;
297  delete [] recvBuff;
298}
299
300/*!
301  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
302*/
303void CClientClientDHT::computeHashIndex(std::vector<size_t>& hashedIndex, int nbClient)
304{
305  // Compute range of hash index for each client
306  hashedIndex.resize(nbClient+1);
307  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
308  size_t nbHashIndex;
309  hashedIndex[0] = 0;
310  for (int i = 1; i < nbClient; ++i)
311  {
312    nbHashIndex = nbHashIndexMax / nbClient;
313    if (i < (nbHashIndexMax%nbClient)) ++nbHashIndex;
314    hashedIndex[i] = hashedIndex[i-1] + nbHashIndex;
315  }
316  hashedIndex[nbClient] = nbHashIndexMax;
317}
318
319/*!
320  Compute distribution of global index for servers
321  Each client already holds a piece of information and its attached index.
322This information will be redistributed among processes by projecting indices into size_t space.
323After the redistribution, each client holds rearranged index and its corresponding information.
324  \param [in] indexInfoMap index and its corresponding info (usually server index)
325  \param [in] commLevel communicator of current level
326  \param [in] level current level
327*/
328void CClientClientDHT::computeDistributedIndex(const boost::unordered_map<size_t,int>& indexInfoMap,
329                                               const MPI_Comm& commLevel,
330                                               int level)
331{
332  int nbClient, clientRank;
333  MPI_Comm_size(commLevel,&nbClient);
334  MPI_Comm_rank(commLevel,&clientRank);
335  std::vector<size_t> hashedIndex;
336  computeHashIndex(hashedIndex, nbClient);
337
338  int* sendBuff = new int[nbClient];
339  int* sendNbIndexBuff = new int[nbClient];
340  for (int i = 0; i < nbClient; ++i)
341  {
342    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
343  }
344
345  // Compute size of sending and receving buffer
346  std::map<int, std::vector<size_t> > client2ClientIndex;
347  std::map<int, std::vector<int> > client2ClientInfo;
348
349  std::vector<size_t>::const_iterator itbClientHash = hashedIndex.begin(), itClientHash,
350                                      iteClientHash = hashedIndex.end();
351  boost::unordered_map<size_t,int>::const_iterator it  = indexInfoMap.begin(),
352                                                   ite = indexInfoMap.end();
353  HashXIOS<size_t> hashGlobalIndex;
354  for (; it != ite; ++it)
355  {
356    size_t hashIndex = hashGlobalIndex(it->first);
357    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
358    if (itClientHash != iteClientHash)
359    {
360      int indexClient = std::distance(itbClientHash, itClientHash)-1;
361      {
362        sendBuff[indexClient] = 1;
363        ++sendNbIndexBuff[indexClient];
364        client2ClientIndex[indexClient].push_back(it->first);
365        client2ClientInfo[indexClient].push_back(it->second);
366      }
367    }
368  }
369
370  // Calculate from how many clients each client receive message.
371  int* recvBuff = new int[nbClient];
372  MPI_Allreduce(sendBuff, recvBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
373  int recvNbClient = recvBuff[clientRank];
374
375  // Calculate size of buffer for receiving message
376  int* recvNbIndexBuff = new int[nbClient];
377  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient, MPI_INT, MPI_SUM, commLevel);
378  int recvNbIndexCount = recvNbIndexBuff[clientRank];
379  unsigned long* recvIndexBuff = new unsigned long[recvNbIndexCount];
380  int* recvInfoBuff = new int[recvNbIndexCount];
381
382  // If a client holds information about index and the corresponding which don't belong to it,
383  // it will send a message to the correct clients.
384  // Contents of the message are index and its corresponding informatioin
385  std::list<MPI_Request> sendRequest;
386  std::map<int, std::vector<size_t> >::iterator itIndex  = client2ClientIndex.begin(),
387                                                iteIndex = client2ClientIndex.end();
388  for (; itIndex != iteIndex; ++itIndex)
389    sendIndexToClients(itIndex->first, itIndex->second, commLevel, sendRequest);
390  std::map<int, std::vector<int> >::iterator itInfo  = client2ClientInfo.begin(),
391                                             iteInfo = client2ClientInfo.end();
392  for (; itInfo != iteInfo; ++itInfo)
393    sendInfoToClients(itInfo->first, itInfo->second, commLevel, sendRequest);
394
395  std::map<int, MPI_Request>::iterator itRequestIndex, itRequestInfo;
396  std::map<int, int> countBuffInfo, countBuffIndex;
397  std::vector<int> processedList;
398
399  bool isFinished = (0 == recvNbClient) ? true : false;
400
401  // Counting of buffer for receiving global index
402  int countIndex = 0;
403
404  // Counting of buffer for receiving server index
405  int countInfo = 0;
406
407  // Request returned by MPI_IRecv function about global index
408  std::map<int, MPI_Request> requestRecvIndex, requestRecvInfo;
409
410  // Mapping client rank and the beginning position of receiving buffer for message of global index from this client
411  std::map<int, unsigned long*> indexBuffBegin;
412
413  // Mapping client rank and the begining position of receiving buffer for message of server index from this client
414  std::map<int, int*> infoBuffBegin;
415
416  boost::unordered_map<size_t,int> indexToInfoMapping;
417
418  // Now each client trys to listen to demand from others.
419  // If they have message, it processes: pushing global index and corresponding server to its map
420  while (!isFinished || (!sendRequest.empty()))
421  {
422    testSendRequest(sendRequest);
423    probeIndexMessageFromClients(recvIndexBuff, recvNbIndexCount,
424                                       countIndex, indexBuffBegin,
425                                       requestRecvIndex, commLevel);
426    // Processing complete request
427    for (itRequestIndex = requestRecvIndex.begin();
428         itRequestIndex != requestRecvIndex.end();
429         ++itRequestIndex)
430    {
431      int rank = itRequestIndex->first;
432      int count = computeBuffCountIndexGlobal(itRequestIndex->second);
433      if (0 != count)
434        countBuffIndex[rank] = count;
435    }
436
437    probeInfoMessageFromClients(recvInfoBuff, recvNbIndexCount,
438                                       countInfo, infoBuffBegin,
439                                       requestRecvInfo, commLevel);
440    for (itRequestInfo = requestRecvInfo.begin();
441         itRequestInfo != requestRecvInfo.end();
442         ++itRequestInfo)
443    {
444      int rank = itRequestInfo->first;
445      int count = computeBuffCountIndexServer(itRequestInfo->second);
446      if (0 != count)
447        countBuffInfo[rank] = count;
448    }
449
450    for (std::map<int, int>::iterator it = countBuffIndex.begin();
451                                      it != countBuffIndex.end(); ++it)
452    {
453      int rank = it->first;
454      if ((countBuffInfo.end() != countBuffInfo.find(rank)) &&
455          (countBuffIndex.end() != countBuffIndex.find(rank)))
456      {
457        int count = it->second;
458        for (int i = 0; i < count; ++i)
459           indexToInfoMapping.insert(std::make_pair<size_t,int>(*(indexBuffBegin[rank]+i),*(infoBuffBegin[rank]+i)));
460        processedList.push_back(rank);
461        --recvNbClient;
462      }
463    }
464
465    for (int i = 0; i < processedList.size(); ++i)
466    {
467      requestRecvInfo.erase(processedList[i]);
468      requestRecvIndex.erase(processedList[i]);
469      countBuffIndex.erase(processedList[i]);
470      countBuffInfo.erase(processedList[i]);
471    }
472
473    if (0 == recvNbClient) isFinished = true;
474  }
475
476  delete [] sendBuff;
477  delete [] sendNbIndexBuff;
478  delete [] recvBuff;
479  delete [] recvNbIndexBuff;
480  delete [] recvIndexBuff;
481  delete [] recvInfoBuff;
482
483  // Ok, now do something recursive
484  if (0 < level)
485  {
486    --level;
487    computeDistributedIndex(indexToInfoMapping, commLevel_[level], level);
488  }
489  else
490    globalIndexToServerMapping_ = indexToInfoMapping;
491}
492
493/*!
494  Probe and receive message containg global index from other clients.
495  Each client can send a message of global index to other clients to fulfill their maps.
496Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
497  \param [in] recvIndexBuff buffer dedicated for receiving global index
498  \param [in] recvNbIndexCount size of the buffer
499  \param [in] countIndex number of received index
500  \param [in] indexBuffBegin beginning of index buffer for each source rank
501  \param [in] requestRecvIndex request of receving index
502  \param [in] intraComm communicator
503*/
504void CClientClientDHT::probeIndexMessageFromClients(unsigned long* recvIndexBuff,
505                                                    const int recvNbIndexCount,
506                                                    int& countIndex,
507                                                    std::map<int, unsigned long*>& indexBuffBegin,
508                                                    std::map<int, MPI_Request>& requestRecvIndex,
509                                                    const MPI_Comm& intraComm)
510{
511  MPI_Status statusIndexGlobal;
512  int flagIndexGlobal, count;
513
514  // Probing for global index
515  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX, intraComm, &flagIndexGlobal, &statusIndexGlobal);
516  if ((true == flagIndexGlobal) && (countIndex < recvNbIndexCount))
517  {
518    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
519    indexBuffBegin.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexBuff+countIndex));
520    MPI_Irecv(recvIndexBuff+countIndex, count, MPI_UNSIGNED_LONG,
521              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX, intraComm,
522              &requestRecvIndex[statusIndexGlobal.MPI_SOURCE]);
523    countIndex += count;
524  }
525}
526
527/*!
528  Probe and receive message containg server index from other clients.
529  Each client can send a message of server index to other clients to fulfill their maps.
530Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
531  \param [in] recvInfoBuff buffer dedicated for receiving server index
532  \param [in] recvNbIndexCount size of the buffer
533  \param [in] countInfo number of received info
534  \param [in] infoBuffBegin beginning of index buffer for each source rank
535  \param [in] requestRecvInfo request of receving index
536  \param [in] intraComm communicator
537*/
538void CClientClientDHT::probeInfoMessageFromClients(int* recvInfoBuff,
539                                                   const int recvNbIndexCount,
540                                                   int& countInfo,
541                                                   std::map<int, int*>& infoBuffBegin,
542                                                   std::map<int, MPI_Request>& requestRecvInfo,
543                                                   const MPI_Comm& intraComm)
544{
545  MPI_Status statusInfo;
546  int flagInfo, count;
547
548  // Probing for server index
549  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO, intraComm, &flagInfo, &statusInfo);
550  if ((true == flagInfo) && (countInfo < recvNbIndexCount))
551  {
552    MPI_Get_count(&statusInfo, MPI_INT, &count);
553    infoBuffBegin.insert(std::make_pair<int, int*>(statusInfo.MPI_SOURCE, recvInfoBuff+countInfo));
554    MPI_Irecv(recvInfoBuff+countInfo, count, MPI_INT,
555              statusInfo.MPI_SOURCE, MPI_DHT_INFO, intraComm,
556              &requestRecvInfo[statusInfo.MPI_SOURCE]);
557
558    countInfo += count;
559  }
560}
561
562/*!
563  Send message containing index to clients
564  \param [in] clientDestRank rank of destination client
565  \param [in] indices index to send
566  \param [in] clientIntraComm communication group of client
567  \param [in] requestSendIndex list of sending request
568*/
569void CClientClientDHT::sendIndexToClients(int clientDestRank, std::vector<size_t>& indices,
570                                          const MPI_Comm& clientIntraComm,
571                                          std::list<MPI_Request>& requestSendIndex)
572{
573  MPI_Request request;
574  requestSendIndex.push_back(request);
575  MPI_Isend(&(indices)[0], (indices).size(), MPI_UNSIGNED_LONG,
576            clientDestRank, MPI_DHT_INDEX, clientIntraComm, &(requestSendIndex.back()));
577}
578
579/*!
580  Send message containing information to clients
581  \param [in] clientDestRank rank of destination client
582  \param [in] info server index to send
583  \param [in] clientIntraComm communication group of client
584  \param [in] requestSendInfo list of sending request
585*/
586void CClientClientDHT::sendInfoToClients(int clientDestRank, std::vector<int>& info,
587                                         const MPI_Comm& clientIntraComm,
588                                         std::list<MPI_Request>& requestSendInfo)
589{
590  MPI_Request request;
591  requestSendInfo.push_back(request);
592  MPI_Isend(&(info)[0], (info).size(), MPI_INT,
593            clientDestRank, MPI_DHT_INFO, clientIntraComm, &(requestSendInfo.back()));
594}
595
596/*!
597  Verify status of sending request
598  \param [in] sendRequest sending request to verify
599*/
600void CClientClientDHT::testSendRequest(std::list<MPI_Request>& sendRequest)
601{
602  int flag = 0;
603  MPI_Status status;
604  std::list<MPI_Request>::iterator itRequest;
605  int sizeListRequest = sendRequest.size();
606  int idx = 0;
607  while (idx < sizeListRequest)
608  {
609    bool isErased = false;
610    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
611    {
612      MPI_Test(&(*itRequest), &flag, &status);
613      if (true == flag)
614      {
615        isErased = true;
616        break;
617      }
618    }
619    if (true == isErased) sendRequest.erase(itRequest);
620    ++idx;
621  }
622}
623
624/*!
625  Process the received request. Pushing global index and server index into map
626  \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
627  \param[in] buffIndexServer pointer to the begining of buffer containing server index
628  \param[in] count size of received message
629*/
630//void CClientClientDHT::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
631//{
632//  for (int i = 0; i < count; ++i)
633//    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
634//}
635
636/*!
637  Compute size of message containing global index
638  \param[in] requestRecv request of message
639*/
640int CClientClientDHT::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
641{
642  int flag, count = 0;
643  MPI_Status status;
644
645  MPI_Test(&requestRecv, &flag, &status);
646  if (true == flag)
647  {
648    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
649  }
650
651  return count;
652}
653
654/*!
655  Compute size of message containing server index
656  \param[in] requestRecv request of message
657*/
658int CClientClientDHT::computeBuffCountIndexServer(MPI_Request& requestRecv)
659{
660  int flag, count = 0;
661  MPI_Status status;
662
663  MPI_Test(&requestRecv, &flag, &status);
664  if (true == flag)
665  {
666    MPI_Get_count(&status, MPI_INT, &count);
667  }
668
669  return count;
670}
671
672}
Note: See TracBrowser for help on using the repository browser.