source: XIOS/trunk/src/client_server_mapping_distributed.cpp @ 827

Last change on this file since 827 was 721, checked in by mhnguyen, 8 years ago

Templated version of distributed hashed table

+) Implement DHT in more generic way to work with different type of information
+) Some old codes of DHT are kept to be a reference (they will be deleted soon)

Test
+) On local, mode attached, 8 processes
+) test_remap passes and result is correct

File size: 20.8 KB
Line 
1/*!
2   \file client_server_mapping.hpp
3   \author Ha NGUYEN
4   \since 27 Feb 2015
5   \date 06 Oct 2015
6
7   \brief Mapping between index client and server.
8   Clients pre-calculate all information of server distribution.
9 */
10#include "client_server_mapping_distributed.hpp"
11#include <limits>
12#include <boost/functional/hash.hpp>
13#include "utils.hpp"
14#include "mpi_tag.hpp"
15
16namespace xios
17{
18
19CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer,
20                                                                 const MPI_Comm& clientIntraComm, bool isDataDistributed)
21  : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0),
22    indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed),
23    ccDHT_(0)
24{
25  clientIntraComm_ = clientIntraComm;
26  MPI_Comm_size(clientIntraComm,&(nbClient_));
27  MPI_Comm_rank(clientIntraComm,&clientRank_);
28  computeHashIndex();
29
30  ccDHT_ = new CClientClientDHTInt(globalIndexOfServer,
31                                   clientIntraComm,
32                                   isDataDistributed);
33
34//  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
35}
36
37CClientServerMappingDistributed::~CClientServerMappingDistributed()
38{
39  if (0 != ccDHT_) delete ccDHT_;
40}
41
42/*!
43   Compute mapping global index of server which client sends to.
44   \param [in] globalIndexOnClient global index client has
45*/
46void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient)
47{
48  ccDHT_->computeIndexInfoMapping(globalIndexOnClient);
49  indexGlobalOnServer_ = (ccDHT_->getInfoIndexMap());
50
51/*
52  size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
53
54  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
55                                      iteClientHash = indexClientHash_.end();
56  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
57  std::map<int, std::vector<int> > client2ClientIndexServer;
58
59  // Number of global index whose mapping server can be found out thanks to index-server mapping
60  int nbIndexAlreadyOnClient = 0;
61
62  // Number of global index whose mapping server are on other clients
63  int nbIndexSendToOthers = 0;
64  HashXIOS<size_t> hashGlobalIndex;
65  for (int i = 0; i < ssize; ++i)
66  {
67    size_t globalIndexClient = globalIndexOnClient(i);
68    hashedIndex  = hashGlobalIndex(globalIndexClient);
69    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex);
70    if (iteClientHash != itClientHash)
71    {
72      int indexClient = std::distance(itbClientHash, itClientHash)-1;
73      {
74        client2ClientIndexGlobal[indexClient].push_back(globalIndexClient);
75        ++nbIndexSendToOthers;
76      }
77    }
78  }
79
80  int* sendBuff = new int[nbClient_];
81  for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0;
82  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndexGlobal.begin(), it,
83                                                ite  = client2ClientIndexGlobal.end();
84  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
85  int* recvBuff = new int[nbClient_];
86  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
87
88  std::list<MPI_Request> sendRequest;
89  if (0 != nbIndexSendToOthers)
90      for (it = itb; it != ite; ++it)
91         sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest);
92
93  int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0;
94
95  // Receiving demand as well as the responds from other clients
96  // The demand message contains global index; meanwhile the responds have server index information
97  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
98    // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
99  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
100  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
101
102  unsigned long* recvBuffIndexGlobal = 0;
103  int maxNbIndexDemandedFromOthers = recvBuff[clientRank_];
104  if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general
105
106  if (0 != maxNbIndexDemandedFromOthers)
107    recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers];
108
109  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
110  int* recvBuffIndexServer = 0;
111  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
112  if (0 != nbIndexReceivedFromOthers)
113    recvBuffIndexServer = new int[nbIndexReceivedFromOthers];
114
115  std::map<int, MPI_Request>::iterator itRequest;
116  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
117
118
119  resetReceivingRequestAndCount();
120  while ((0 < nbDemandingClient) || (!sendRequest.empty()) ||
121         (nbIndexServerReceived < nbIndexReceivedFromOthers))
122  {
123    // Just check whether a client has any demand from other clients.
124    // If it has, then it should send responds to these client(s)
125    probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers);
126    if (0 < nbDemandingClient)
127    {
128      for (itRequest = requestRecvIndexGlobal_.begin();
129           itRequest != requestRecvIndexGlobal_.end(); ++itRequest)
130      {
131        int flagIndexGlobal, count;
132        MPI_Status statusIndexGlobal;
133
134        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
135        if (true == flagIndexGlobal)
136        {
137          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
138          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
139          unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank];
140          for (int i = 0; i < count; ++i)
141          {
142            client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]);
143          }
144          sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest);
145          --nbDemandingClient;
146
147          demandAlreadyReceived.push_back(clientSourceRank);
148        }
149      }
150      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
151        requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]);
152    }
153
154    testSendRequest(sendRequest);
155
156    // In some cases, a client need to listen respond from other clients about server information
157    // Ok, with the information, a client can fill in its server-global index map.
158    probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers);
159    for (itRequest = requestRecvIndexServer_.begin();
160         itRequest != requestRecvIndexServer_.end();
161         ++itRequest)
162    {
163      int flagIndexServer, count;
164      MPI_Status statusIndexServer;
165
166      MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer);
167      if (true == flagIndexServer)
168      {
169        MPI_Get_count(&statusIndexServer, MPI_INT, &count);
170        int clientSourceRank = statusIndexServer.MPI_SOURCE;
171        int* beginBuff = indexServerBuffBegin_[clientSourceRank];
172        std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank];
173        for (int i = 0; i < count; ++i)
174        {
175          (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]);
176        }
177        nbIndexServerReceived += count;
178        repondAlreadyReceived.push_back(clientSourceRank);
179      }
180    }
181
182    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
183      requestRecvIndexServer_.erase(repondAlreadyReceived[i]);
184    repondAlreadyReceived.resize(0);
185  }
186
187  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndexGlobal;
188  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffIndexServer;
189  delete [] sendBuff;
190  delete [] recvBuff;
191*/
192}
193
194/*!
195  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
196*/
197void CClientServerMappingDistributed::computeHashIndex()
198{
199  // Compute range of hash index for each client
200  indexClientHash_.resize(nbClient_+1);
201  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
202  size_t nbHashIndex;
203  indexClientHash_[0] = 0;
204  for (int i = 1; i < nbClient_; ++i)
205  {
206    nbHashIndex = nbHashIndexMax / nbClient_;
207    if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex;
208    indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex;
209  }
210  indexClientHash_[nbClient_] = nbHashIndexMax;
211}
212
213/*!
214  Compute distribution of global index for servers
215  Each client already holds a piece of information about global index and the corresponding server.
216This information is redistributed into size_t space in which each client possesses a specific range of index.
217After the redistribution, each client as well as its range of index contains all necessary information about server.
218  \param [in] globalIndexOfServer global index and the corresponding server
219  \param [in] clientIntraComm client joining distribution process.
220*/
221void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer,
222                                                                    const MPI_Comm& clientIntraComm)
223{
224  int* sendBuff = new int[nbClient_];
225  int* sendNbIndexBuff = new int[nbClient_];
226  for (int i = 0; i < nbClient_; ++i)
227  {
228    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
229  }
230
231  // Compute size of sending and receving buffer
232  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
233  std::map<int, std::vector<int> > client2ClientIndexServer;
234
235  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
236                                      iteClientHash = indexClientHash_.end();
237  boost::unordered_map<size_t,int>::const_iterator it  = globalIndexOfServer.begin(),
238                                                   ite = globalIndexOfServer.end();
239  HashXIOS<size_t> hashGlobalIndex;
240  for (; it != ite; ++it)
241  {
242    size_t hashIndex = hashGlobalIndex(it->first);
243    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
244    if (itClientHash != iteClientHash)
245    {
246      int indexClient = std::distance(itbClientHash, itClientHash)-1;
247      {
248        sendBuff[indexClient] = 1;
249        ++sendNbIndexBuff[indexClient];
250        client2ClientIndexGlobal[indexClient].push_back(it->first);
251        client2ClientIndexServer[indexClient].push_back(it->second);
252      }
253    }
254  }
255
256  // Calculate from how many clients each client receive message.
257  int* recvBuff = new int[nbClient_];
258  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
259  int recvNbClient = recvBuff[clientRank_];
260
261  // Calculate size of buffer for receiving message
262  int* recvNbIndexBuff = new int[nbClient_];
263  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
264  int recvNbIndexCount = recvNbIndexBuff[clientRank_];
265  unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount];
266  int* recvIndexServerBuff = new int[recvNbIndexCount];
267
268  // If a client holds information about global index and servers which don't belong to it,
269  // it will send a message to the correct clients.
270  // Contents of the message are global index and its corresponding server index
271  std::list<MPI_Request> sendRequest;
272  std::map<int, std::vector<size_t> >::iterator itGlobal  = client2ClientIndexGlobal.begin(),
273                                                iteGlobal = client2ClientIndexGlobal.end();
274  for ( ;itGlobal != iteGlobal; ++itGlobal)
275    sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest);
276  std::map<int, std::vector<int> >::iterator itServer  = client2ClientIndexServer.begin(),
277                                             iteServer = client2ClientIndexServer.end();
278  for (; itServer != iteServer; ++itServer)
279    sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest);
280
281  std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer;
282  std::map<int, int> countBuffIndexServer, countBuffIndexGlobal;
283  std::vector<int> processedList;
284
285  bool isFinished = (0 == recvNbClient) ? true : false;
286
287  // Just to make sure before listening message, all counting index and receiving request have already beeen reset
288  resetReceivingRequestAndCount();
289
290  // Now each client trys to listen to demand from others.
291  // If they have message, it processes: pushing global index and corresponding server to its map
292  while (!isFinished || (!sendRequest.empty()))
293  {
294    testSendRequest(sendRequest);
295    probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount);
296
297    // Processing complete request
298    for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin();
299         itRequestIndexGlobal != requestRecvIndexGlobal_.end();
300         ++itRequestIndexGlobal)
301    {
302      int rank = itRequestIndexGlobal->first;
303      int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second);
304      if (0 != countIndexGlobal)
305        countBuffIndexGlobal[rank] = countIndexGlobal;
306    }
307
308    probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount);
309    for (itRequestIndexServer = requestRecvIndexServer_.begin();
310         itRequestIndexServer != requestRecvIndexServer_.end();
311         ++itRequestIndexServer)
312    {
313      int rank = itRequestIndexServer->first;
314      int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second);
315      if (0 != countIndexServer)
316        countBuffIndexServer[rank] = countIndexServer;
317    }
318
319    for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin();
320                                      it != countBuffIndexGlobal.end(); ++it)
321    {
322      int rank = it->first;
323      if (countBuffIndexServer.end() != countBuffIndexServer.find(rank))
324      {
325        processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second);
326        processedList.push_back(rank);
327        --recvNbClient;
328      }
329    }
330
331    for (int i = 0; i < processedList.size(); ++i)
332    {
333      requestRecvIndexServer_.erase(processedList[i]);
334      requestRecvIndexGlobal_.erase(processedList[i]);
335      countBuffIndexGlobal.erase(processedList[i]);
336      countBuffIndexServer.erase(processedList[i]);
337    }
338
339    if (0 == recvNbClient) isFinished = true;
340  }
341
342  delete [] sendBuff;
343  delete [] sendNbIndexBuff;
344  delete [] recvBuff;
345  delete [] recvNbIndexBuff;
346  delete [] recvIndexGlobalBuff;
347  delete [] recvIndexServerBuff;
348}
349
350/*!
351  Probe and receive message containg global index from other clients.
352  Each client can send a message of global index to other clients to fulfill their maps.
353Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
354  \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index
355  \param [in] recvNbIndexCount size of the buffer
356*/
357void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount)
358{
359  MPI_Status statusIndexGlobal;
360  int flagIndexGlobal, count;
361
362  // Probing for global index
363  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
364  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
365  {
366    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
367    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
368    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
369              statusIndexGlobal.MPI_SOURCE, MPI_DHT_INDEX_0, clientIntraComm_,
370              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
371    countIndexGlobal_ += count;
372  }
373}
374
375/*!
376  Probe and receive message containg server index from other clients.
377  Each client can send a message of server index to other clients to fulfill their maps.
378Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
379  \param [in] recvIndexServerBuff buffer dedicated for receiving server index
380  \param [in] recvNbIndexCount size of the buffer
381*/
382void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)
383{
384  MPI_Status statusIndexServer;
385  int flagIndexServer, count;
386
387  // Probing for server index
388  MPI_Iprobe(MPI_ANY_SOURCE, MPI_DHT_INFO_0, clientIntraComm_, &flagIndexServer, &statusIndexServer);
389  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
390  {
391    MPI_Get_count(&statusIndexServer, MPI_INT, &count);
392    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
393    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
394              statusIndexServer.MPI_SOURCE, MPI_DHT_INFO_0, clientIntraComm_,
395              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
396
397    countIndexServer_ += count;
398  }
399}
400
401/*!
402  Send message containing global index to clients
403  \param [in] clientDestRank rank of destination client
404  \param [in] indexGlobal global index to send
405  \param [in] clientIntraComm communication group of client
406  \param [in] requestSendIndexGlobal list of sending request
407*/
408void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,
409                                                               const MPI_Comm& clientIntraComm,
410                                                               std::list<MPI_Request>& requestSendIndexGlobal)
411{
412  MPI_Request request;
413  requestSendIndexGlobal.push_back(request);
414  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
415            clientDestRank, MPI_DHT_INDEX_0, clientIntraComm, &(requestSendIndexGlobal.back()));
416}
417
418/*!
419  Send message containing server index to clients
420  \param [in] clientDestRank rank of destination client
421  \param [in] indexServer server index to send
422  \param [in] clientIntraComm communication group of client
423  \param [in] requestSendIndexServer list of sending request
424*/
425void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,
426                                                               const MPI_Comm& clientIntraComm,
427                                                               std::list<MPI_Request>& requestSendIndexServer)
428{
429  MPI_Request request;
430  requestSendIndexServer.push_back(request);
431  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
432            clientDestRank, MPI_DHT_INFO_0, clientIntraComm, &(requestSendIndexServer.back()));
433}
434
435/*!
436  Verify status of sending request
437  \param [in] sendRequest sending request to verify
438*/
439void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)
440{
441  int flag = 0;
442  MPI_Status status;
443  std::list<MPI_Request>::iterator itRequest;
444  int sizeListRequest = sendRequest.size();
445  int idx = 0;
446  while (idx < sizeListRequest)
447  {
448    bool isErased = false;
449    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
450    {
451      MPI_Test(&(*itRequest), &flag, &status);
452      if (true == flag)
453      {
454        isErased = true;
455        break;
456      }
457    }
458    if (true == isErased) sendRequest.erase(itRequest);
459    ++idx;
460  }
461}
462
463/*!
464  Process the received request. Pushing global index and server index into map
465  \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
466  \param[in] buffIndexServer pointer to the begining of buffer containing server index
467  \param[in] count size of received message
468*/
469void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
470{
471  for (int i = 0; i < count; ++i)
472    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
473}
474
475/*!
476  Compute size of message containing global index
477  \param[in] requestRecv request of message
478*/
479int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
480{
481  int flag, count = 0;
482  MPI_Status status;
483
484  MPI_Test(&requestRecv, &flag, &status);
485  if (true == flag)
486  {
487    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
488  }
489
490  return count;
491}
492
493/*!
494  Compute size of message containing server index
495  \param[in] requestRecv request of message
496*/
497int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)
498{
499  int flag, count = 0;
500  MPI_Status status;
501
502  MPI_Test(&requestRecv, &flag, &status);
503  if (true == flag)
504  {
505    MPI_Get_count(&status, MPI_INT, &count);
506  }
507
508  return count;
509}
510
511/*!
512  Reset all receiving request map and counter
513*/
514void CClientServerMappingDistributed::resetReceivingRequestAndCount()
515{
516  countIndexGlobal_ = countIndexServer_ = 0;
517  requestRecvIndexGlobal_.clear();
518  requestRecvIndexServer_.clear();
519  indexGlobalBuffBegin_.clear();
520  indexServerBuffBegin_.clear();
521}
522
523}
Note: See TracBrowser for help on using the repository browser.