source: XIOS/trunk/src/client_server_mapping_distributed.cpp @ 720

Last change on this file since 720 was 720, checked in by mhnguyen, 6 years ago

First implementation of hierarchical distributed hashed table

+) Implement dht for int with index of type size_t

Test
+) Local
+) Work correctly

File size: 21.0 KB
Line 
1/*!
2   \file client_server_mapping.hpp
3   \author Ha NGUYEN
4   \since 27 Feb 2015
5   \date 09 Mars 2015
6
7   \brief Mapping between index client and server.
8   Clients pre-calculate all information of server distribution.
9 */
10#include "client_server_mapping_distributed.hpp"
11#include <limits>
12#include <boost/functional/hash.hpp>
13#include "utils.hpp"
14#include "client_client_dht.hpp"
15#include "mpi_tag.hpp"
16
17namespace xios
18{
19
20CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer,
21                                                                 const MPI_Comm& clientIntraComm, bool isDataDistributed)
22  : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0),
23    indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed),
24    ccDHT_(0)
25{
26  clientIntraComm_ = clientIntraComm;
27  MPI_Comm_size(clientIntraComm,&(nbClient_));
28  MPI_Comm_rank(clientIntraComm,&clientRank_);
29  computeHashIndex();
30
31  ccDHT_ = new CClientClientDHT(globalIndexOfServer,
32                                clientIntraComm,
33                                isDataDistributed);
34//  const boost::unordered_map<size_t,int>& globalIndexToServerMappingTmp = clientDht.getGlobalIndexServerMapping();
35//  globalIndexToServerMapping_ = clientDht.getGlobalIndexServerMapping();
36
37
38
39//  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
40}
41
42CClientServerMappingDistributed::~CClientServerMappingDistributed()
43{
44  if (0 != ccDHT_) delete ccDHT_;
45}
46
47/*!
48   Compute mapping global index of server which client sends to.
49   \param [in] globalIndexOnClient global index client has
50*/
51void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient)
52{
53  ccDHT_->computeServerIndexMapping(globalIndexOnClient);
54  indexGlobalOnServer_ = ccDHT_->getGlobalIndexOnServer();
55
56/*
57  size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
58
59  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
60                                      iteClientHash = indexClientHash_.end();
61  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
62  std::map<int, std::vector<int> > client2ClientIndexServer;
63
64  // Number of global index whose mapping server can be found out thanks to index-server mapping
65  int nbIndexAlreadyOnClient = 0;
66
67  // Number of global index whose mapping server are on other clients
68  int nbIndexSendToOthers = 0;
69  HashXIOS<size_t> hashGlobalIndex;
70  for (int i = 0; i < ssize; ++i)
71  {
72    size_t globalIndexClient = globalIndexOnClient(i);
73    hashedIndex  = hashGlobalIndex(globalIndexClient);
74    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex);
75    if (iteClientHash != itClientHash)
76    {
77      int indexClient = std::distance(itbClientHash, itClientHash)-1;
78      {
79        client2ClientIndexGlobal[indexClient].push_back(globalIndexClient);
80        ++nbIndexSendToOthers;
81      }
82    }
83  }
84
85  int* sendBuff = new int[nbClient_];
86  for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0;
87  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndexGlobal.begin(), it,
88                                                ite  = client2ClientIndexGlobal.end();
89  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
90  int* recvBuff = new int[nbClient_];
91  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
92
93  std::list<MPI_Request> sendRequest;
94  if (0 != nbIndexSendToOthers)
95      for (it = itb; it != ite; ++it)
96         sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest);
97
98  int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0;
99
100  // Receiving demand as well as the responds from other clients
101  // The demand message contains global index; meanwhile the responds have server index information
102  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
103    // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
104  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
105  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
106
107  unsigned long* recvBuffIndexGlobal = 0;
108  int maxNbIndexDemandedFromOthers = recvBuff[clientRank_];
109  if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general
110
111  if (0 != maxNbIndexDemandedFromOthers)
112    recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers];
113
114  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
115  int* recvBuffIndexServer = 0;
116  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
117  if (0 != nbIndexReceivedFromOthers)
118    recvBuffIndexServer = new int[nbIndexReceivedFromOthers];
119
120  std::map<int, MPI_Request>::iterator itRequest;
121  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
122
123
124  resetReceivingRequestAndCount();
125  while ((0 < nbDemandingClient) || (!sendRequest.empty()) ||
126         (nbIndexServerReceived < nbIndexReceivedFromOthers))
127  {
128    // Just check whether a client has any demand from other clients.
129    // If it has, then it should send responds to these client(s)
130    probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers);
131    if (0 < nbDemandingClient)
132    {
133      for (itRequest = requestRecvIndexGlobal_.begin();
134           itRequest != requestRecvIndexGlobal_.end(); ++itRequest)
135      {
136        int flagIndexGlobal, count;
137        MPI_Status statusIndexGlobal;
138
139        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
140        if (true == flagIndexGlobal)
141        {
142          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
143          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
144          unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank];
145          for (int i = 0; i < count; ++i)
146          {
147            client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]);
148          }
149          sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest);
150          --nbDemandingClient;
151
152          demandAlreadyReceived.push_back(clientSourceRank);
153        }
154      }
155      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
156        requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]);
157    }
158
159    testSendRequest(sendRequest);
160
161    // In some cases, a client need to listen respond from other clients about server information
162    // Ok, with the information, a client can fill in its server-global index map.
163    probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers);
164    for (itRequest = requestRecvIndexServer_.begin();
165         itRequest != requestRecvIndexServer_.end();
166         ++itRequest)
167    {
168      int flagIndexServer, count;
169      MPI_Status statusIndexServer;
170
171      MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer);
172      if (true == flagIndexServer)
173      {
174        MPI_Get_count(&statusIndexServer, MPI_INT, &count);
175        int clientSourceRank = statusIndexServer.MPI_SOURCE;
176        int* beginBuff = indexServerBuffBegin_[clientSourceRank];
177        std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank];
178        for (int i = 0; i < count; ++i)
179        {
180          (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]);
181        }
182        nbIndexServerReceived += count;
183        repondAlreadyReceived.push_back(clientSourceRank);
184      }
185    }
186
187    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
188      requestRecvIndexServer_.erase(repondAlreadyReceived[i]);
189    repondAlreadyReceived.resize(0);
190  }
191
192  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndexGlobal;
193  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffIndexServer;
194  delete [] sendBuff;
195  delete [] recvBuff;
196*/
197}
198
199/*!
200  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
201*/
202void CClientServerMappingDistributed::computeHashIndex()
203{
204  // Compute range of hash index for each client
205  indexClientHash_.resize(nbClient_+1);
206  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
207  size_t nbHashIndex;
208  indexClientHash_[0] = 0;
209  for (int i = 1; i < nbClient_; ++i)
210  {
211    nbHashIndex = nbHashIndexMax / nbClient_;
212    if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex;
213    indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex;
214  }
215  indexClientHash_[nbClient_] = nbHashIndexMax;
216}
217
218/*!
219  Compute distribution of global index for servers
220  Each client already holds a piece of information about global index and the corresponding server.
221This information is redistributed into size_t space in which each client possesses a specific range of index.
222After the redistribution, each client as well as its range of index contains all necessary information about server.
223  \param [in] globalIndexOfServer global index and the corresponding server
224  \param [in] clientIntraComm client joining distribution process.
225*/
226void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer,
227                                                                    const MPI_Comm& clientIntraComm)
228{
229  int* sendBuff = new int[nbClient_];
230  int* sendNbIndexBuff = new int[nbClient_];
231  for (int i = 0; i < nbClient_; ++i)
232  {
233    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
234  }
235
236  // Compute size of sending and receving buffer
237  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
238  std::map<int, std::vector<int> > client2ClientIndexServer;
239
240  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
241                                      iteClientHash = indexClientHash_.end();
242  boost::unordered_map<size_t,int>::const_iterator it  = globalIndexOfServer.begin(),
243                                                   ite = globalIndexOfServer.end();
244  HashXIOS<size_t> hashGlobalIndex;
245  for (; it != ite; ++it)
246  {
247    size_t hashIndex = hashGlobalIndex(it->first);
248    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
249    if (itClientHash != iteClientHash)
250    {
251      int indexClient = std::distance(itbClientHash, itClientHash)-1;
252      {
253        sendBuff[indexClient] = 1;
254        ++sendNbIndexBuff[indexClient];
255        client2ClientIndexGlobal[indexClient].push_back(it->first);
256        client2ClientIndexServer[indexClient].push_back(it->second);
257      }
258    }
259  }
260
261  // Calculate from how many clients each client receive message.
262  int* recvBuff = new int[nbClient_];
263  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
264  int recvNbClient = recvBuff[clientRank_];
265
266  // Calculate size of buffer for receiving message
267  int* recvNbIndexBuff = new int[nbClient_];
268  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
269  int recvNbIndexCount = recvNbIndexBuff[clientRank_];
270  unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount];
271  int* recvIndexServerBuff = new int[recvNbIndexCount];
272
273  // If a client holds information about global index and servers which don't belong to it,
274  // it will send a message to the correct clients.
275  // Contents of the message are global index and its corresponding server index
276  std::list<MPI_Request> sendRequest;
277  std::map<int, std::vector<size_t> >::iterator itGlobal  = client2ClientIndexGlobal.begin(),
278                                                iteGlobal = client2ClientIndexGlobal.end();
279  for ( ;itGlobal != iteGlobal; ++itGlobal)
280    sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest);
281  std::map<int, std::vector<int> >::iterator itServer  = client2ClientIndexServer.begin(),
282                                             iteServer = client2ClientIndexServer.end();
283  for (; itServer != iteServer; ++itServer)
284    sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest);
285
286  std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer;
287  std::map<int, int> countBuffIndexServer, countBuffIndexGlobal;
288  std::vector<int> processedList;
289
290  bool isFinished = (0 == recvNbClient) ? true : false;
291
292  // Just to make sure before listening message, all counting index and receiving request have already beeen reset
293  resetReceivingRequestAndCount();
294
295  // Now each client trys to listen to demand from others.
296  // If they have message, it processes: pushing global index and corresponding server to its map
297  while (!isFinished || (!sendRequest.empty()))
298  {
299    testSendRequest(sendRequest);
300    probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount);
301
302    // Processing complete request
303    for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin();
304         itRequestIndexGlobal != requestRecvIndexGlobal_.end();
305         ++itRequestIndexGlobal)
306    {
307      int rank = itRequestIndexGlobal->first;
308      int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second);
309      if (0 != countIndexGlobal)
310        countBuffIndexGlobal[rank] = countIndexGlobal;
311    }
312
313    probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount);
314    for (itRequestIndexServer = requestRecvIndexServer_.begin();
315         itRequestIndexServer != requestRecvIndexServer_.end();
316         ++itRequestIndexServer)
317    {
318      int rank = itRequestIndexServer->first;
319      int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second);
320      if (0 != countIndexServer)
321        countBuffIndexServer[rank] = countIndexServer;
322    }
323
324    for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin();
325                                      it != countBuffIndexGlobal.end(); ++it)
326    {
327      int rank = it->first;
328      if (countBuffIndexServer.end() != countBuffIndexServer.find(rank))
329      {
330        processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second);
331        processedList.push_back(rank);
332        --recvNbClient;
333      }
334    }
335
336    for (int i = 0; i < processedList.size(); ++i)
337    {
338      requestRecvIndexServer_.erase(processedList[i]);
339      requestRecvIndexGlobal_.erase(processedList[i]);
340      countBuffIndexGlobal.erase(processedList[i]);
341      countBuffIndexServer.erase(processedList[i]);
342    }
343
344    if (0 == recvNbClient) isFinished = true;
345  }
346
347  delete [] sendBuff;
348  delete [] sendNbIndexBuff;
349  delete [] recvBuff;
350  delete [] recvNbIndexBuff;
351  delete [] recvIndexGlobalBuff;
352  delete [] recvIndexServerBuff;
353}
354
355/*!
356  Probe and receive message containg global index from other clients.
357  Each client can send a message of global index to other clients to fulfill their maps.
358Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
359  \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index
360  \param [in] recvNbIndexCount size of the buffer
361*/
362void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount)
363{
364  MPI_Status statusIndexGlobal;
365  int flagIndexGlobal, count;
366
367  // Probing for global index
368  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
369  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
370  {
371    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
372    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
373    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
374              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_,
375              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
376    countIndexGlobal_ += count;
377  }
378}
379
380/*!
381  Probe and receive message containg server index from other clients.
382  Each client can send a message of server index to other clients to fulfill their maps.
383Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
384  \param [in] recvIndexServerBuff buffer dedicated for receiving server index
385  \param [in] recvNbIndexCount size of the buffer
386*/
387void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)
388{
389  MPI_Status statusIndexServer;
390  int flagIndexServer, count;
391
392  // Probing for server index
393  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO_0, clientIntraComm_, &flagIndexServer, &statusIndexServer);
394  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
395  {
396    MPI_Get_count(&statusIndexServer, MPI_INT, &count);
397    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
398    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
399              statusIndexServer.MPI_SOURCE, MPI_DHT_INFO_0, clientIntraComm_,
400              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
401
402    countIndexServer_ += count;
403  }
404}
405
406/*!
407  Send message containing global index to clients
408  \param [in] clientDestRank rank of destination client
409  \param [in] indexGlobal global index to send
410  \param [in] clientIntraComm communication group of client
411  \param [in] requestSendIndexGlobal list of sending request
412*/
413void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,
414                                                               const MPI_Comm& clientIntraComm,
415                                                               std::list<MPI_Request>& requestSendIndexGlobal)
416{
417  MPI_Request request;
418  requestSendIndexGlobal.push_back(request);
419  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
420            clientDestRank, MPI_DHT_INDEX_0, clientIntraComm, &(requestSendIndexGlobal.back()));
421}
422
423/*!
424  Send message containing server index to clients
425  \param [in] clientDestRank rank of destination client
426  \param [in] indexServer server index to send
427  \param [in] clientIntraComm communication group of client
428  \param [in] requestSendIndexServer list of sending request
429*/
430void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,
431                                                               const MPI_Comm& clientIntraComm,
432                                                               std::list<MPI_Request>& requestSendIndexServer)
433{
434  MPI_Request request;
435  requestSendIndexServer.push_back(request);
436  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
437            clientDestRank, MPI_DHT_INFO_0, clientIntraComm, &(requestSendIndexServer.back()));
438}
439
440/*!
441  Verify status of sending request
442  \param [in] sendRequest sending request to verify
443*/
444void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)
445{
446  int flag = 0;
447  MPI_Status status;
448  std::list<MPI_Request>::iterator itRequest;
449  int sizeListRequest = sendRequest.size();
450  int idx = 0;
451  while (idx < sizeListRequest)
452  {
453    bool isErased = false;
454    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
455    {
456      MPI_Test(&(*itRequest), &flag, &status);
457      if (true == flag)
458      {
459        isErased = true;
460        break;
461      }
462    }
463    if (true == isErased) sendRequest.erase(itRequest);
464    ++idx;
465  }
466}
467
468/*!
469  Process the received request. Pushing global index and server index into map
470  \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
471  \param[in] buffIndexServer pointer to the begining of buffer containing server index
472  \param[in] count size of received message
473*/
474void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
475{
476  for (int i = 0; i < count; ++i)
477    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
478}
479
480/*!
481  Compute size of message containing global index
482  \param[in] requestRecv request of message
483*/
484int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
485{
486  int flag, count = 0;
487  MPI_Status status;
488
489  MPI_Test(&requestRecv, &flag, &status);
490  if (true == flag)
491  {
492    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
493  }
494
495  return count;
496}
497
498/*!
499  Compute size of message containing server index
500  \param[in] requestRecv request of message
501*/
502int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)
503{
504  int flag, count = 0;
505  MPI_Status status;
506
507  MPI_Test(&requestRecv, &flag, &status);
508  if (true == flag)
509  {
510    MPI_Get_count(&status, MPI_INT, &count);
511  }
512
513  return count;
514}
515
516/*!
517  Reset all receiving request map and counter
518*/
519void CClientServerMappingDistributed::resetReceivingRequestAndCount()
520{
521  countIndexGlobal_ = countIndexServer_ = 0;
522  requestRecvIndexGlobal_.clear();
523  requestRecvIndexServer_.clear();
524  indexGlobalBuffBegin_.clear();
525  indexServerBuffBegin_.clear();
526}
527
528}
Note: See TracBrowser for help on using the repository browser.