source: XIOS/trunk/src/client_server_mapping_distributed.cpp @ 623

Last change on this file since 623 was 623, checked in by mhnguyen, 9 years ago

Implementing transformation algorithm: zoom axis (local commit)

+) Implement zoom axis: zoomed points are points not masked
+) Correct some minor bugs

Test
+) Ok with normal cases: zoom in the last of transformation list
+) There is still a bug in case of zoom then inverse

File size: 20.7 KB
Line 
1/*!
2   \file client_server_mapping.hpp
3   \author Ha NGUYEN
4   \since 27 Feb 2015
5   \date 09 Mars 2015
6
7   \brief Mapping between index client and server.
8   Clients pre-calculate all information of server distribution.
9 */
10#include "client_server_mapping_distributed.hpp"
11#include <limits>
12#include <boost/functional/hash.hpp>
13#include "utils.hpp"
14
15namespace xios
16{
17
18CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer,
19                                                                 const MPI_Comm& clientIntraComm, bool isDataDistributed)
20  : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0),
21    indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed)
22{
23  clientIntraComm_ = clientIntraComm;
24  MPI_Comm_size(clientIntraComm,&(nbClient_));
25  MPI_Comm_rank(clientIntraComm,&clientRank_);
26  computeHashIndex();
27  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
28}
29
30CClientServerMappingDistributed::~CClientServerMappingDistributed()
31{
32}
33
34/*!
35   Compute mapping global index of server which client sends to.
36   \param [in] globalIndexOnClient global index client has
37   \param [in] localIndexOnClient local index on client
38*/
39void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient)
40{
41  size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
42
43  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
44                                      iteClientHash = indexClientHash_.end();
45  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
46  std::map<int, std::vector<int> > client2ClientIndexServer;
47
48  // Number of global index whose mapping server can be found out thanks to index-server mapping
49  int nbIndexAlreadyOnClient = 0;
50
51  // Number of global index whose mapping server are on other clients
52  int nbIndexSendToOthers = 0;
53  HashXIOS<size_t> hashGlobalIndex;
54  for (int i = 0; i < ssize; ++i)
55  {
56    size_t globalIndexClient = globalIndexOnClient(i);
57    hashedIndex  = hashGlobalIndex(globalIndexClient);
58    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex);
59    if (iteClientHash != itClientHash)
60    {
61      int indexClient = std::distance(itbClientHash, itClientHash)-1;
62
63      if (clientRank_ == indexClient)
64      {
65        (indexGlobalOnServer_[globalIndexToServerMapping_[globalIndexClient]]).push_back(globalIndexClient);
66        ++nbIndexAlreadyOnClient;
67      }
68      else
69      {
70        client2ClientIndexGlobal[indexClient].push_back(globalIndexClient);
71        ++nbIndexSendToOthers;
72      }
73    }
74  }
75
76  int* sendBuff = new int[nbClient_];
77  for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0;
78  std::map<int, std::vector<size_t> >::iterator it  = client2ClientIndexGlobal.begin(),
79                                                ite = client2ClientIndexGlobal.end();
80  for (; it != ite; ++it) sendBuff[it->first] = 1;
81  int* recvBuff = new int[nbClient_];
82  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
83
84  std::list<MPI_Request> sendRequest;
85  if (0 != nbIndexSendToOthers)
86      for (it = client2ClientIndexGlobal.begin(); it != ite; ++it)
87         sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest);
88
89  int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0;
90  // Receiving demand as well as the responds from other clients
91  // The demand message contains global index; meanwhile the responds have server index information
92  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
93  unsigned long* recvBuffIndexGlobal = 0;
94  int maxNbIndexDemandedFromOthers = (nbIndexAlreadyOnClient >= globalIndexToServerMapping_.size())
95                                   ? 0 : (globalIndexToServerMapping_.size() - nbIndexAlreadyOnClient);
96  if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * globalIndexToServerMapping_.size(); // Not very optimal but it's general
97
98  if (0 != maxNbIndexDemandedFromOthers)
99    recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers];
100
101  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
102  int* recvBuffIndexServer = 0;
103  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
104//  int nbIndexReceivedFromOthers = globalIndexToServerMapping_.size() - nbIndexAlreadyOnClient;
105  if (0 != nbIndexReceivedFromOthers)
106    recvBuffIndexServer = new int[nbIndexReceivedFromOthers];
107
108  std::map<int, MPI_Request>::iterator itRequest;
109  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
110
111
112  resetReceivingRequestAndCount();
113  while ((0 < nbDemandingClient) || (!sendRequest.empty()) ||
114         (nbIndexServerReceived < nbIndexReceivedFromOthers))
115  {
116    // Just check whether a client has any demand from other clients.
117    // If it has, then it should send responds to these client(s)
118    probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers);
119    if (0 < nbDemandingClient)
120    {
121      for (itRequest = requestRecvIndexGlobal_.begin();
122           itRequest != requestRecvIndexGlobal_.end(); ++itRequest)
123      {
124        int flagIndexGlobal, count;
125        MPI_Status statusIndexGlobal;
126
127        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
128        if (true == flagIndexGlobal)
129        {
130          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
131          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
132          unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank];
133          for (int i = 0; i < count; ++i)
134          {
135            client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]);
136          }
137          sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest);
138          --nbDemandingClient;
139
140          demandAlreadyReceived.push_back(clientSourceRank);
141        }
142      }
143      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
144        requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]);
145    }
146
147    testSendRequest(sendRequest);
148
149    // In some cases, a client need to listen respond from other clients about server information
150    // Ok, with the information, a client can fill in its server-global index map.
151    probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers);
152    for (itRequest = requestRecvIndexServer_.begin();
153         itRequest != requestRecvIndexServer_.end();
154         ++itRequest)
155    {
156      int flagIndexServer, count;
157      MPI_Status statusIndexServer;
158
159      MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer);
160      if (true == flagIndexServer)
161      {
162        MPI_Get_count(&statusIndexServer, MPI_INT, &count);
163        int clientSourceRank = statusIndexServer.MPI_SOURCE;
164        int* beginBuff = indexServerBuffBegin_[clientSourceRank];
165        std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank];
166        for (int i = 0; i < count; ++i)
167        {
168          (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]);
169        }
170        nbIndexServerReceived += count;
171        repondAlreadyReceived.push_back(clientSourceRank);
172      }
173    }
174
175    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
176      requestRecvIndexServer_.erase(repondAlreadyReceived[i]);
177    repondAlreadyReceived.resize(0);
178  }
179
180  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndexGlobal;
181  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffIndexServer;
182  delete [] sendBuff;
183  delete [] recvBuff;
184}
185
186/*!
187  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
188*/
189void CClientServerMappingDistributed::computeHashIndex()
190{
191  // Compute range of hash index for each client
192  indexClientHash_.resize(nbClient_+1);
193  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
194  size_t nbHashIndex;
195  indexClientHash_[0] = 0;
196  for (int i = 1; i < nbClient_; ++i)
197  {
198    nbHashIndex = nbHashIndexMax / nbClient_;
199    if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex;
200    indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex;
201  }
202  indexClientHash_[nbClient_] = nbHashIndexMax;
203}
204
205/*!
206  Compute distribution of global index for servers
207  Each client already holds a piece of information about global index and the corresponding server.
208This information is redistributed into size_t space in which each client possesses a specific range of index.
209After the redistribution, each client as well as its range of index contains all necessary information about server.
210  \param [in] globalIndexOfServer global index and the corresponding server
211  \param [in] clientIntraComm client joining distribution process.
212*/
213void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer,
214                                                                    const MPI_Comm& clientIntraComm)
215{
216  int* sendBuff = new int[nbClient_];
217  int* sendNbIndexBuff = new int[nbClient_];
218  for (int i = 0; i < nbClient_; ++i)
219  {
220    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
221  }
222
223  // Compute size of sending and receving buffer
224  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
225  std::map<int, std::vector<int> > client2ClientIndexServer;
226
227  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
228                                      iteClientHash = indexClientHash_.end();
229  boost::unordered_map<size_t,int>::const_iterator it  = globalIndexOfServer.begin(),
230                                                   ite = globalIndexOfServer.end();
231  HashXIOS<size_t> hashGlobalIndex;
232  for (; it != ite; ++it)
233  {
234    size_t hashIndex = hashGlobalIndex(it->first);
235    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
236    if (itClientHash != iteClientHash)
237    {
238      int indexClient = std::distance(itbClientHash, itClientHash)-1;
239      if (clientRank_ == indexClient)
240      {
241        globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(it->first, it->second));
242      }
243      else
244      {
245        sendBuff[indexClient] = 1;
246        ++sendNbIndexBuff[indexClient];
247        client2ClientIndexGlobal[indexClient].push_back(it->first);
248        client2ClientIndexServer[indexClient].push_back(it->second);
249      }
250    }
251  }
252
253  // Calculate from how many clients each client receive message.
254  int* recvBuff = new int[nbClient_];
255  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
256  int recvNbClient = recvBuff[clientRank_];
257
258  // Calculate size of buffer for receiving message
259  int* recvNbIndexBuff = new int[nbClient_];
260  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
261  int recvNbIndexCount = recvNbIndexBuff[clientRank_];
262  unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount];
263  int* recvIndexServerBuff = new int[recvNbIndexCount];
264
265  // If a client holds information about global index and servers which don't belong to it,
266  // it will send a message to the correct clients.
267  // Contents of the message are global index and its corresponding server index
268  std::list<MPI_Request> sendRequest;
269  std::map<int, std::vector<size_t> >::iterator itGlobal  = client2ClientIndexGlobal.begin(),
270                                                iteGlobal = client2ClientIndexGlobal.end();
271  for ( ;itGlobal != iteGlobal; ++itGlobal)
272    sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest);
273  std::map<int, std::vector<int> >::iterator itServer  = client2ClientIndexServer.begin(),
274                                             iteServer = client2ClientIndexServer.end();
275  for (; itServer != iteServer; ++itServer)
276    sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest);
277
278  std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer;
279  std::map<int, int> countBuffIndexServer, countBuffIndexGlobal;
280  std::vector<int> processedList;
281
282  bool isFinished = (0 == recvNbClient) ? true : false;
283
284  // Just to make sure before listening message, all counting index and receiving request have already beeen reset
285  resetReceivingRequestAndCount();
286
287  // Now each client trys to listen to demand from others.
288  // If they have message, it processes: pushing global index and corresponding server to its map
289  while (!isFinished || (!sendRequest.empty()))
290  {
291    testSendRequest(sendRequest);
292    probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount);
293
294    // Processing complete request
295    for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin();
296         itRequestIndexGlobal != requestRecvIndexGlobal_.end();
297         ++itRequestIndexGlobal)
298    {
299      int rank = itRequestIndexGlobal->first;
300      int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second);
301      if (0 != countIndexGlobal)
302        countBuffIndexGlobal[rank] = countIndexGlobal;
303    }
304
305    probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount);
306    for (itRequestIndexServer = requestRecvIndexServer_.begin();
307         itRequestIndexServer != requestRecvIndexServer_.end();
308         ++itRequestIndexServer)
309    {
310      int rank = itRequestIndexServer->first;
311      int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second);
312      if (0 != countIndexServer)
313        countBuffIndexServer[rank] = countIndexServer;
314    }
315
316    for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin();
317                                      it != countBuffIndexGlobal.end(); ++it)
318    {
319      int rank = it->first;
320      if (countBuffIndexServer.end() != countBuffIndexServer.find(rank))
321      {
322        processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second);
323        processedList.push_back(rank);
324        --recvNbClient;
325      }
326    }
327
328    for (int i = 0; i < processedList.size(); ++i)
329    {
330      requestRecvIndexServer_.erase(processedList[i]);
331      requestRecvIndexGlobal_.erase(processedList[i]);
332      countBuffIndexGlobal.erase(processedList[i]);
333      countBuffIndexServer.erase(processedList[i]);
334    }
335
336    if (0 == recvNbClient) isFinished = true;
337  }
338
339  delete [] sendBuff;
340  delete [] sendNbIndexBuff;
341  delete [] recvBuff;
342  delete [] recvNbIndexBuff;
343  delete [] recvIndexGlobalBuff;
344  delete [] recvIndexServerBuff;
345}
346
347/*!
348  Probe and receive message containg global index from other clients.
349  Each client can send a message of global index to other clients to fulfill their maps.
350Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
351  \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index
352  \param [in] recvNbIndexCount size of the buffer
353*/
354void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount)
355{
356  MPI_Status statusIndexGlobal;
357  int flagIndexGlobal, count;
358
359  // Probing for global index
360  MPI_Iprobe(MPI_ANY_SOURCE, 15, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
361  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
362  {
363    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
364    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
365    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
366              statusIndexGlobal.MPI_SOURCE, 15, clientIntraComm_,
367              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
368    countIndexGlobal_ += count;
369  }
370}
371
372/*!
373  Probe and receive message containg server index from other clients.
374  Each client can send a message of server index to other clients to fulfill their maps.
375Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
376  \param [in] recvIndexServerBuff buffer dedicated for receiving server index
377  \param [in] recvNbIndexCount size of the buffer
378*/
379void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)
380{
381  MPI_Status statusIndexServer;
382  int flagIndexServer, count;
383
384  // Probing for server index
385  MPI_Iprobe(MPI_ANY_SOURCE, 12, clientIntraComm_, &flagIndexServer, &statusIndexServer);
386  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
387  {
388    MPI_Get_count(&statusIndexServer, MPI_INT, &count);
389    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
390    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
391              statusIndexServer.MPI_SOURCE, 12, clientIntraComm_,
392              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
393
394    countIndexServer_ += count;
395  }
396}
397
398/*!
399  Send message containing global index to clients
400  \param [in] clientDestRank rank of destination client
401  \param [in] indexGlobal global index to send
402  \param [in] clientIntraComm communication group of client
403  \param [in] requestSendIndexGlobal list of sending request
404*/
405void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,
406                                                               const MPI_Comm& clientIntraComm,
407                                                               std::list<MPI_Request>& requestSendIndexGlobal)
408{
409  MPI_Request request;
410  requestSendIndexGlobal.push_back(request);
411  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
412            clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back()));
413}
414
415/*!
416  Send message containing server index to clients
417  \param [in] clientDestRank rank of destination client
418  \param [in] indexServer server index to send
419  \param [in] clientIntraComm communication group of client
420  \param [in] requestSendIndexServer list of sending request
421*/
422void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,
423                                                               const MPI_Comm& clientIntraComm,
424                                                               std::list<MPI_Request>& requestSendIndexServer)
425{
426  MPI_Request request;
427  requestSendIndexServer.push_back(request);
428  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
429            clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back()));
430}
431
432/*!
433  Verify status of sending request
434  \param [in] sendRequest sending request to verify
435*/
436void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)
437{
438  int flag = 0;
439  MPI_Status status;
440  std::list<MPI_Request>::iterator itRequest;
441  int sizeListRequest = sendRequest.size();
442  int idx = 0;
443  while (idx < sizeListRequest)
444  {
445    bool isErased = false;
446    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
447    {
448      MPI_Test(&(*itRequest), &flag, &status);
449      if (true == flag)
450      {
451        isErased = true;
452        break;
453      }
454    }
455    if (true == isErased) sendRequest.erase(itRequest);
456    ++idx;
457  }
458}
459
460/*!
461  Process the received request. Pushing global index and server index into map
462  \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
463  \param[in] buffIndexServer pointer to the begining of buffer containing server index
464  \param[in] count size of received message
465*/
466void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
467{
468  for (int i = 0; i < count; ++i)
469    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
470}
471
472/*!
473  Compute size of message containing global index
474  \param[in] requestRecv request of message
475*/
476int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
477{
478  int flag, count = 0;
479  MPI_Status status;
480
481  MPI_Test(&requestRecv, &flag, &status);
482  if (true == flag)
483  {
484    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
485  }
486
487  return count;
488}
489
490/*!
491  Compute size of message containing server index
492  \param[in] requestRecv request of message
493*/
494int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)
495{
496  int flag, count = 0;
497  MPI_Status status;
498
499  MPI_Test(&requestRecv, &flag, &status);
500  if (true == flag)
501  {
502    MPI_Get_count(&status, MPI_INT, &count);
503  }
504
505  return count;
506}
507
508/*!
509  Reset all receiving request map and counter
510*/
511void CClientServerMappingDistributed::resetReceivingRequestAndCount()
512{
513  countIndexGlobal_ = countIndexServer_ = 0;
514  requestRecvIndexGlobal_.clear();
515  requestRecvIndexServer_.clear();
516  indexGlobalBuffBegin_.clear();
517  indexServerBuffBegin_.clear();
518}
519
520}
Note: See TracBrowser for help on using the repository browser.