source: XIOS/trunk/src/client_server_mapping_distributed.cpp @ 593

Last change on this file since 593 was 585, checked in by mhnguyen, 9 years ago

Modifying some functions to make sure zoom working even with grid not distributed

+) Change some code in sendIndex to make sure non-distributed grid work with zoom

Test
+) On Curie
+) test_client: passed and results are same like on the branchs
+) test_complete: there is a difference of output because of zoom index offset

File size: 20.9 KB
Line 
1/*!
2   \file client_server_mapping.hpp
3   \author Ha NGUYEN
4   \since 27 Feb 2015
5   \date 09 Mars 2015
6
7   \brief Mapping between index client and server.
8   Clients pre-calculate all information of server distribution.
9 */
10#include "client_server_mapping_distributed.hpp"
11#include <limits>
12#include <boost/functional/hash.hpp>
13#include "utils.hpp"
14
15namespace xios
16{
17
18CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer,
19                                                                 const MPI_Comm& clientIntraComm, bool isDataDistributed)
20  : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0),
21    indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed)
22{
23  clientIntraComm_ = clientIntraComm;
24  MPI_Comm_size(clientIntraComm,&(nbClient_));
25  MPI_Comm_rank(clientIntraComm,&clientRank_);
26  computeHashIndex();
27  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
28}
29
30CClientServerMappingDistributed::~CClientServerMappingDistributed()
31{
32}
33
34/*!
35   Compute mapping global index of server which client sends to.
36   \param [in] globalIndexOnClient global index client has
37   \param [in] localIndexOnClient local index on client
38*/
39//void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient,
40//                                                                const CArray<int,1>& localIndexOnClient)
41void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient)
42{
43  size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
44
45  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
46                                      iteClientHash = indexClientHash_.end();
47  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
48  std::map<int, std::vector<int> > client2ClientIndexServer;
49
50  // Number of global index whose mapping server can be found out thanks to index-server mapping
51  int nbIndexAlreadyOnClient = 0;
52
53  // Number of global index whose mapping server are on other clients
54  int nbIndexSendToOthers = 0;
55  HashXIOS<size_t> hashGlobalIndex;
56  for (int i = 0; i < ssize; ++i)
57  {
58    size_t globalIndexClient = globalIndexOnClient(i);
59    hashedIndex  = hashGlobalIndex(globalIndexClient);
60    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex);
61    if (iteClientHash != itClientHash)
62    {
63      int indexClient = std::distance(itbClientHash, itClientHash)-1;
64
65      if (clientRank_ == indexClient)
66      {
67        (indexGlobalOnServer_[globalIndexToServerMapping_[globalIndexClient]]).push_back(globalIndexClient);
68        ++nbIndexAlreadyOnClient;
69      }
70      else
71      {
72        client2ClientIndexGlobal[indexClient].push_back(globalIndexClient);
73        ++nbIndexSendToOthers;
74      }
75    }
76  }
77
78  int* sendBuff = new int[nbClient_];
79  for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0;
80  std::map<int, std::vector<size_t> >::iterator it  = client2ClientIndexGlobal.begin(),
81                                                ite = client2ClientIndexGlobal.end();
82  for (; it != ite; ++it) sendBuff[it->first] = 1;
83  int* recvBuff = new int[nbClient_];
84  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
85
86  std::list<MPI_Request> sendRequest;
87  if (0 != nbIndexSendToOthers)
88      for (it = client2ClientIndexGlobal.begin(); it != ite; ++it)
89         sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest);
90
91  int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0;
92  // Receiving demand as well as the responds from other clients
93  // The demand message contains global index; meanwhile the responds have server index information
94  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
95  unsigned long* recvBuffIndexGlobal = 0;
96  int maxNbIndexDemandedFromOthers = (nbIndexAlreadyOnClient >= globalIndexToServerMapping_.size())
97                                   ? 0 : (globalIndexToServerMapping_.size() - nbIndexAlreadyOnClient);
98  if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * globalIndexToServerMapping_.size(); // Not very optimal but it's general
99
100  if (0 != maxNbIndexDemandedFromOthers)
101    recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers];
102
103  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
104  int* recvBuffIndexServer = 0;
105  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
106//  int nbIndexReceivedFromOthers = globalIndexToServerMapping_.size() - nbIndexAlreadyOnClient;
107  if (0 != nbIndexReceivedFromOthers)
108    recvBuffIndexServer = new int[nbIndexReceivedFromOthers];
109
110  std::map<int, MPI_Request>::iterator itRequest;
111  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
112
113
114  resetReceivingRequestAndCount();
115  while ((0 < nbDemandingClient) || (!sendRequest.empty()) ||
116         (nbIndexServerReceived < nbIndexReceivedFromOthers))
117  {
118    // Just check whether a client has any demand from other clients.
119    // If it has, then it should send responds to these client(s)
120    probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers);
121    if (0 < nbDemandingClient)
122    {
123      for (itRequest = requestRecvIndexGlobal_.begin();
124           itRequest != requestRecvIndexGlobal_.end(); ++itRequest)
125      {
126        int flagIndexGlobal, count;
127        MPI_Status statusIndexGlobal;
128
129        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
130        if (true == flagIndexGlobal)
131        {
132          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
133          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
134          unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank];
135          for (int i = 0; i < count; ++i)
136          {
137            client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]);
138          }
139          sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest);
140          --nbDemandingClient;
141
142          demandAlreadyReceived.push_back(clientSourceRank);
143        }
144      }
145      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
146        requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]);
147    }
148
149    testSendRequest(sendRequest);
150
151    // In some cases, a client need to listen respond from other clients about server information
152    // Ok, with the information, a client can fill in its server-global index map.
153    probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers);
154    for (itRequest = requestRecvIndexServer_.begin();
155         itRequest != requestRecvIndexServer_.end();
156         ++itRequest)
157    {
158      int flagIndexServer, count;
159      MPI_Status statusIndexServer;
160
161      MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer);
162      if (true == flagIndexServer)
163      {
164        MPI_Get_count(&statusIndexServer, MPI_INT, &count);
165        int clientSourceRank = statusIndexServer.MPI_SOURCE;
166        int* beginBuff = indexServerBuffBegin_[clientSourceRank];
167        std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank];
168        for (int i = 0; i < count; ++i)
169        {
170          (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]);
171        }
172        nbIndexServerReceived += count;
173        repondAlreadyReceived.push_back(clientSourceRank);
174      }
175    }
176
177    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
178      requestRecvIndexServer_.erase(repondAlreadyReceived[i]);
179    repondAlreadyReceived.resize(0);
180  }
181
182  if (0 != recvBuffIndexGlobal) delete recvBuffIndexGlobal;
183  if (0 != recvBuffIndexServer) delete recvBuffIndexServer;
184  delete [] sendBuff;
185  delete [] recvBuff;
186}
187
188/*!
189  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
190*/
191void CClientServerMappingDistributed::computeHashIndex()
192{
193  // Compute range of hash index for each client
194  indexClientHash_.resize(nbClient_+1);
195  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
196  size_t nbHashIndex;
197  indexClientHash_[0] = 0;
198  for (int i = 1; i < nbClient_; ++i)
199  {
200    nbHashIndex = nbHashIndexMax / nbClient_;
201    if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex;
202    indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex;
203  }
204  indexClientHash_[nbClient_] = nbHashIndexMax;
205}
206
207/*!
208  Compute distribution of global index for servers
209  Each client already holds a piece of information about global index and the corresponding server.
210This information is redistributed into size_t space in which each client possesses a specific range of index.
211After the redistribution, each client as well as its range of index contains all necessary information about server.
212  \param [in] globalIndexOfServer global index and the corresponding server
213  \param [in] clientIntraComm client joining distribution process.
214*/
215void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer,
216                                                                    const MPI_Comm& clientIntraComm)
217{
218  int* sendBuff = new int[nbClient_];
219  int* sendNbIndexBuff = new int[nbClient_];
220  for (int i = 0; i < nbClient_; ++i)
221  {
222    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
223  }
224
225  // Compute size of sending and receving buffer
226  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
227  std::map<int, std::vector<int> > client2ClientIndexServer;
228
229  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
230                                      iteClientHash = indexClientHash_.end();
231  boost::unordered_map<size_t,int>::const_iterator it  = globalIndexOfServer.begin(),
232                                                   ite = globalIndexOfServer.end();
233  HashXIOS<size_t> hashGlobalIndex;
234  for (; it != ite; ++it)
235  {
236    size_t hashIndex = hashGlobalIndex(it->first);
237    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
238    if (itClientHash != iteClientHash)
239    {
240      int indexClient = std::distance(itbClientHash, itClientHash)-1;
241      if (clientRank_ == indexClient)
242      {
243        globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(it->first, it->second));
244      }
245      else
246      {
247        sendBuff[indexClient] = 1;
248        ++sendNbIndexBuff[indexClient];
249        client2ClientIndexGlobal[indexClient].push_back(it->first);
250        client2ClientIndexServer[indexClient].push_back(it->second);
251      }
252    }
253  }
254
255  // Calculate from how many clients each client receive message.
256  int* recvBuff = new int[nbClient_];
257  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
258  int recvNbClient = recvBuff[clientRank_];
259
260  // Calculate size of buffer for receiving message
261  int* recvNbIndexBuff = new int[nbClient_];
262  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
263  int recvNbIndexCount = recvNbIndexBuff[clientRank_];
264  unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount];
265  int* recvIndexServerBuff = new int[recvNbIndexCount];
266
267  // If a client holds information about global index and servers which don't belong to it,
268  // it will send a message to the correct clients.
269  // Contents of the message are global index and its corresponding server index
270  std::list<MPI_Request> sendRequest;
271  std::map<int, std::vector<size_t> >::iterator itGlobal  = client2ClientIndexGlobal.begin(),
272                                                iteGlobal = client2ClientIndexGlobal.end();
273  for ( ;itGlobal != iteGlobal; ++itGlobal)
274    sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest);
275  std::map<int, std::vector<int> >::iterator itServer  = client2ClientIndexServer.begin(),
276                                             iteServer = client2ClientIndexServer.end();
277  for (; itServer != iteServer; ++itServer)
278    sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest);
279
280  std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer;
281  std::map<int, int> countBuffIndexServer, countBuffIndexGlobal;
282  std::vector<int> processedList;
283
284  bool isFinished = (0 == recvNbClient) ? true : false;
285
286  // Just to make sure before listening message, all counting index and receiving request have already beeen reset
287  resetReceivingRequestAndCount();
288
289  // Now each client trys to listen to demand from others.
290  // If they have message, it processes: pushing global index and corresponding server to its map
291  while (!isFinished || (!sendRequest.empty()))
292  {
293    testSendRequest(sendRequest);
294    probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount);
295
296    // Processing complete request
297    for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin();
298         itRequestIndexGlobal != requestRecvIndexGlobal_.end();
299         ++itRequestIndexGlobal)
300    {
301      int rank = itRequestIndexGlobal->first;
302      int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second);
303      if (0 != countIndexGlobal)
304        countBuffIndexGlobal[rank] = countIndexGlobal;
305    }
306
307    probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount);
308    for (itRequestIndexServer = requestRecvIndexServer_.begin();
309         itRequestIndexServer != requestRecvIndexServer_.end();
310         ++itRequestIndexServer)
311    {
312      int rank = itRequestIndexServer->first;
313      int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second);
314      if (0 != countIndexServer)
315        countBuffIndexServer[rank] = countIndexServer;
316    }
317
318    for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin();
319                                      it != countBuffIndexGlobal.end(); ++it)
320    {
321      int rank = it->first;
322      if (countBuffIndexServer.end() != countBuffIndexServer.find(rank))
323      {
324        processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second);
325        processedList.push_back(rank);
326        --recvNbClient;
327      }
328    }
329
330    for (int i = 0; i < processedList.size(); ++i)
331    {
332      requestRecvIndexServer_.erase(processedList[i]);
333      requestRecvIndexGlobal_.erase(processedList[i]);
334      countBuffIndexGlobal.erase(processedList[i]);
335      countBuffIndexServer.erase(processedList[i]);
336    }
337
338    if (0 == recvNbClient) isFinished = true;
339  }
340
341  delete [] sendBuff;
342  delete [] sendNbIndexBuff;
343  delete [] recvBuff;
344  delete [] recvNbIndexBuff;
345  delete [] recvIndexGlobalBuff;
346  delete [] recvIndexServerBuff;
347}
348
349/*!
350  Probe and receive message containg global index from other clients.
351  Each client can send a message of global index to other clients to fulfill their maps.
352Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
353  \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index
354  \param [in] recvNbIndexCount size of the buffer
355*/
356void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount)
357{
358  MPI_Status statusIndexGlobal;
359  int flagIndexGlobal, count;
360
361  // Probing for global index
362  MPI_Iprobe(MPI_ANY_SOURCE, 15, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
363  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
364  {
365    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
366    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
367    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
368              statusIndexGlobal.MPI_SOURCE, 15, clientIntraComm_,
369              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
370    countIndexGlobal_ += count;
371  }
372}
373
374/*!
375  Probe and receive message containg server index from other clients.
376  Each client can send a message of server index to other clients to fulfill their maps.
377Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
378  \param [in] recvIndexServerBuff buffer dedicated for receiving server index
379  \param [in] recvNbIndexCount size of the buffer
380*/
381void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)
382{
383  MPI_Status statusIndexServer;
384  int flagIndexServer, count;
385
386  // Probing for server index
387  MPI_Iprobe(MPI_ANY_SOURCE, 12, clientIntraComm_, &flagIndexServer, &statusIndexServer);
388  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
389  {
390    MPI_Get_count(&statusIndexServer, MPI_INT, &count);
391    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
392    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
393              statusIndexServer.MPI_SOURCE, 12, clientIntraComm_,
394              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
395
396    countIndexServer_ += count;
397  }
398}
399
400/*!
401  Send message containing global index to clients
402  \param [in] clientDestRank rank of destination client
403  \param [in] indexGlobal global index to send
404  \param [in] clientIntraComm communication group of client
405  \param [in] requestSendIndexGlobal list of sending request
406*/
407void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,
408                                                               const MPI_Comm& clientIntraComm,
409                                                               std::list<MPI_Request>& requestSendIndexGlobal)
410{
411  MPI_Request request;
412  requestSendIndexGlobal.push_back(request);
413  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
414            clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back()));
415}
416
417/*!
418  Send message containing server index to clients
419  \param [in] clientDestRank rank of destination client
420  \param [in] indexServer server index to send
421  \param [in] clientIntraComm communication group of client
422  \param [in] requestSendIndexServer list of sending request
423*/
424void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,
425                                                               const MPI_Comm& clientIntraComm,
426                                                               std::list<MPI_Request>& requestSendIndexServer)
427{
428  MPI_Request request;
429  requestSendIndexServer.push_back(request);
430  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
431            clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back()));
432}
433
434/*!
435  Verify status of sending request
436  \param [in] sendRequest sending request to verify
437*/
438void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)
439{
440  int flag = 0;
441  MPI_Status status;
442  std::list<MPI_Request>::iterator itRequest;
443  int sizeListRequest = sendRequest.size();
444  int idx = 0;
445  while (idx < sizeListRequest)
446  {
447    bool isErased = false;
448    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
449    {
450      MPI_Test(&(*itRequest), &flag, &status);
451      if (true == flag)
452      {
453        isErased = true;
454        break;
455      }
456    }
457    if (true == isErased) sendRequest.erase(itRequest);
458    ++idx;
459  }
460}
461
462/*!
463  Process the received request. Pushing global index and server index into map
464  \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
465  \param[in] buffIndexServer pointer to the begining of buffer containing server index
466  \param[in] count size of received message
467*/
468void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
469{
470  for (int i = 0; i < count; ++i)
471    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
472}
473
474/*!
475  Compute size of message containing global index
476  \param[in] requestRecv request of message
477*/
478int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
479{
480  int flag, count = 0;
481  MPI_Status status;
482
483  MPI_Test(&requestRecv, &flag, &status);
484  if (true == flag)
485  {
486    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
487  }
488
489  return count;
490}
491
492/*!
493  Compute size of message containing server index
494  \param[in] requestRecv request of message
495*/
496int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)
497{
498  int flag, count = 0;
499  MPI_Status status;
500
501  MPI_Test(&requestRecv, &flag, &status);
502  if (true == flag)
503  {
504    MPI_Get_count(&status, MPI_INT, &count);
505  }
506
507  return count;
508}
509
510/*!
511  Reset all receiving request map and counter
512*/
513void CClientServerMappingDistributed::resetReceivingRequestAndCount()
514{
515  countIndexGlobal_ = countIndexServer_ = 0;
516  requestRecvIndexGlobal_.clear();
517  requestRecvIndexServer_.clear();
518  indexGlobalBuffBegin_.clear();
519  indexServerBuffBegin_.clear();
520}
521
522}
Note: See TracBrowser for help on using the repository browser.