source: XIOS/trunk/src/client_server_mapping_distributed.cpp @ 631

Last change on this file since 631 was 630, checked in by mhnguyen, 9 years ago

Implementing interpolation (polynomial) and correct some bugs

+) Implement interpolation (polynomial)
+) Correct some minor bugs relating to memory allocation
+) Clear some redundant codes

Test
+) On Curie
+) test_client and test_complete pass

File size: 20.3 KB
Line 
1/*!
2   \file client_server_mapping.hpp
3   \author Ha NGUYEN
4   \since 27 Feb 2015
5   \date 09 Mars 2015
6
7   \brief Mapping between index client and server.
8   Clients pre-calculate all information of server distribution.
9 */
10#include "client_server_mapping_distributed.hpp"
11#include <limits>
12#include <boost/functional/hash.hpp>
13#include "utils.hpp"
14
15namespace xios
16{
17
18CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer,
19                                                                 const MPI_Comm& clientIntraComm, bool isDataDistributed)
20  : CClientServerMapping(), indexClientHash_(), countIndexGlobal_(0), countIndexServer_(0),
21    indexGlobalBuffBegin_(), indexServerBuffBegin_(), requestRecvIndexServer_(), isDataDistributed_(isDataDistributed)
22{
23  clientIntraComm_ = clientIntraComm;
24  MPI_Comm_size(clientIntraComm,&(nbClient_));
25  MPI_Comm_rank(clientIntraComm,&clientRank_);
26  computeHashIndex();
27  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
28}
29
30CClientServerMappingDistributed::~CClientServerMappingDistributed()
31{
32}
33
34/*!
35   Compute mapping global index of server which client sends to.
36   \param [in] globalIndexOnClient global index client has
37*/
38void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient)
39{
40  size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
41
42  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
43                                      iteClientHash = indexClientHash_.end();
44  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
45  std::map<int, std::vector<int> > client2ClientIndexServer;
46
47  // Number of global index whose mapping server can be found out thanks to index-server mapping
48  int nbIndexAlreadyOnClient = 0;
49
50  // Number of global index whose mapping server are on other clients
51  int nbIndexSendToOthers = 0;
52  HashXIOS<size_t> hashGlobalIndex;
53  for (int i = 0; i < ssize; ++i)
54  {
55    size_t globalIndexClient = globalIndexOnClient(i);
56    hashedIndex  = hashGlobalIndex(globalIndexClient);
57    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex);
58    if (iteClientHash != itClientHash)
59    {
60      int indexClient = std::distance(itbClientHash, itClientHash)-1;
61      {
62        client2ClientIndexGlobal[indexClient].push_back(globalIndexClient);
63        ++nbIndexSendToOthers;
64      }
65    }
66  }
67
68  int* sendBuff = new int[nbClient_];
69  for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0;
70  std::map<int, std::vector<size_t> >::iterator itb  = client2ClientIndexGlobal.begin(), it,
71                                                ite  = client2ClientIndexGlobal.end();
72  for (it = itb; it != ite; ++it) sendBuff[it->first] = 1;
73  int* recvBuff = new int[nbClient_];
74  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
75
76  std::list<MPI_Request> sendRequest;
77  if (0 != nbIndexSendToOthers)
78      for (it = itb; it != ite; ++it)
79         sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest);
80
81  int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0;
82
83  // Receiving demand as well as the responds from other clients
84  // The demand message contains global index; meanwhile the responds have server index information
85  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
86    // There are some cases we demand duplicate index so need to determine maxium size of demanding buffer
87  for (it = itb; it != ite; ++it) sendBuff[it->first] = (it->second).size();
88  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
89
90  unsigned long* recvBuffIndexGlobal = 0;
91  int maxNbIndexDemandedFromOthers = recvBuff[clientRank_];
92  if (!isDataDistributed_) maxNbIndexDemandedFromOthers = nbDemandingClient * nbIndexSendToOthers; //globalIndexToServerMapping_.size(); // Not very optimal but it's general
93
94  if (0 != maxNbIndexDemandedFromOthers)
95    recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers];
96
97  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
98  int* recvBuffIndexServer = 0;
99  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
100  if (0 != nbIndexReceivedFromOthers)
101    recvBuffIndexServer = new int[nbIndexReceivedFromOthers];
102
103  std::map<int, MPI_Request>::iterator itRequest;
104  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
105
106
107  resetReceivingRequestAndCount();
108  while ((0 < nbDemandingClient) || (!sendRequest.empty()) ||
109         (nbIndexServerReceived < nbIndexReceivedFromOthers))
110  {
111    // Just check whether a client has any demand from other clients.
112    // If it has, then it should send responds to these client(s)
113    probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers);
114    if (0 < nbDemandingClient)
115    {
116      for (itRequest = requestRecvIndexGlobal_.begin();
117           itRequest != requestRecvIndexGlobal_.end(); ++itRequest)
118      {
119        int flagIndexGlobal, count;
120        MPI_Status statusIndexGlobal;
121
122        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
123        if (true == flagIndexGlobal)
124        {
125          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
126          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
127          unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank];
128          for (int i = 0; i < count; ++i)
129          {
130            client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]);
131          }
132          sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest);
133          --nbDemandingClient;
134
135          demandAlreadyReceived.push_back(clientSourceRank);
136        }
137      }
138      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
139        requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]);
140    }
141
142    testSendRequest(sendRequest);
143
144    // In some cases, a client need to listen respond from other clients about server information
145    // Ok, with the information, a client can fill in its server-global index map.
146    probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers);
147    for (itRequest = requestRecvIndexServer_.begin();
148         itRequest != requestRecvIndexServer_.end();
149         ++itRequest)
150    {
151      int flagIndexServer, count;
152      MPI_Status statusIndexServer;
153
154      MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer);
155      if (true == flagIndexServer)
156      {
157        MPI_Get_count(&statusIndexServer, MPI_INT, &count);
158        int clientSourceRank = statusIndexServer.MPI_SOURCE;
159        int* beginBuff = indexServerBuffBegin_[clientSourceRank];
160        std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank];
161        for (int i = 0; i < count; ++i)
162        {
163          (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]);
164        }
165        nbIndexServerReceived += count;
166        repondAlreadyReceived.push_back(clientSourceRank);
167      }
168    }
169
170    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
171      requestRecvIndexServer_.erase(repondAlreadyReceived[i]);
172    repondAlreadyReceived.resize(0);
173  }
174
175  if (0 != maxNbIndexDemandedFromOthers) delete [] recvBuffIndexGlobal;
176  if (0 != nbIndexReceivedFromOthers) delete [] recvBuffIndexServer;
177  delete [] sendBuff;
178  delete [] recvBuff;
179}
180
181/*!
182  Compute the hash index distribution of whole size_t space then each client will have a range of this distribution
183*/
184void CClientServerMappingDistributed::computeHashIndex()
185{
186  // Compute range of hash index for each client
187  indexClientHash_.resize(nbClient_+1);
188  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
189  size_t nbHashIndex;
190  indexClientHash_[0] = 0;
191  for (int i = 1; i < nbClient_; ++i)
192  {
193    nbHashIndex = nbHashIndexMax / nbClient_;
194    if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex;
195    indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex;
196  }
197  indexClientHash_[nbClient_] = nbHashIndexMax;
198}
199
200/*!
201  Compute distribution of global index for servers
202  Each client already holds a piece of information about global index and the corresponding server.
203This information is redistributed into size_t space in which each client possesses a specific range of index.
204After the redistribution, each client as well as its range of index contains all necessary information about server.
205  \param [in] globalIndexOfServer global index and the corresponding server
206  \param [in] clientIntraComm client joining distribution process.
207*/
208void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer,
209                                                                    const MPI_Comm& clientIntraComm)
210{
211  int* sendBuff = new int[nbClient_];
212  int* sendNbIndexBuff = new int[nbClient_];
213  for (int i = 0; i < nbClient_; ++i)
214  {
215    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
216  }
217
218  // Compute size of sending and receving buffer
219  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
220  std::map<int, std::vector<int> > client2ClientIndexServer;
221
222  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
223                                      iteClientHash = indexClientHash_.end();
224  boost::unordered_map<size_t,int>::const_iterator it  = globalIndexOfServer.begin(),
225                                                   ite = globalIndexOfServer.end();
226  HashXIOS<size_t> hashGlobalIndex;
227  for (; it != ite; ++it)
228  {
229    size_t hashIndex = hashGlobalIndex(it->first);
230    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
231    if (itClientHash != iteClientHash)
232    {
233      int indexClient = std::distance(itbClientHash, itClientHash)-1;
234      {
235        sendBuff[indexClient] = 1;
236        ++sendNbIndexBuff[indexClient];
237        client2ClientIndexGlobal[indexClient].push_back(it->first);
238        client2ClientIndexServer[indexClient].push_back(it->second);
239      }
240    }
241  }
242
243  // Calculate from how many clients each client receive message.
244  int* recvBuff = new int[nbClient_];
245  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
246  int recvNbClient = recvBuff[clientRank_];
247
248  // Calculate size of buffer for receiving message
249  int* recvNbIndexBuff = new int[nbClient_];
250  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
251  int recvNbIndexCount = recvNbIndexBuff[clientRank_];
252  unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount];
253  int* recvIndexServerBuff = new int[recvNbIndexCount];
254
255  // If a client holds information about global index and servers which don't belong to it,
256  // it will send a message to the correct clients.
257  // Contents of the message are global index and its corresponding server index
258  std::list<MPI_Request> sendRequest;
259  std::map<int, std::vector<size_t> >::iterator itGlobal  = client2ClientIndexGlobal.begin(),
260                                                iteGlobal = client2ClientIndexGlobal.end();
261  for ( ;itGlobal != iteGlobal; ++itGlobal)
262    sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest);
263  std::map<int, std::vector<int> >::iterator itServer  = client2ClientIndexServer.begin(),
264                                             iteServer = client2ClientIndexServer.end();
265  for (; itServer != iteServer; ++itServer)
266    sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest);
267
268  std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer;
269  std::map<int, int> countBuffIndexServer, countBuffIndexGlobal;
270  std::vector<int> processedList;
271
272  bool isFinished = (0 == recvNbClient) ? true : false;
273
274  // Just to make sure before listening message, all counting index and receiving request have already beeen reset
275  resetReceivingRequestAndCount();
276
277  // Now each client trys to listen to demand from others.
278  // If they have message, it processes: pushing global index and corresponding server to its map
279  while (!isFinished || (!sendRequest.empty()))
280  {
281    testSendRequest(sendRequest);
282    probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount);
283
284    // Processing complete request
285    for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin();
286         itRequestIndexGlobal != requestRecvIndexGlobal_.end();
287         ++itRequestIndexGlobal)
288    {
289      int rank = itRequestIndexGlobal->first;
290      int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second);
291      if (0 != countIndexGlobal)
292        countBuffIndexGlobal[rank] = countIndexGlobal;
293    }
294
295    probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount);
296    for (itRequestIndexServer = requestRecvIndexServer_.begin();
297         itRequestIndexServer != requestRecvIndexServer_.end();
298         ++itRequestIndexServer)
299    {
300      int rank = itRequestIndexServer->first;
301      int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second);
302      if (0 != countIndexServer)
303        countBuffIndexServer[rank] = countIndexServer;
304    }
305
306    for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin();
307                                      it != countBuffIndexGlobal.end(); ++it)
308    {
309      int rank = it->first;
310      if (countBuffIndexServer.end() != countBuffIndexServer.find(rank))
311      {
312        processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second);
313        processedList.push_back(rank);
314        --recvNbClient;
315      }
316    }
317
318    for (int i = 0; i < processedList.size(); ++i)
319    {
320      requestRecvIndexServer_.erase(processedList[i]);
321      requestRecvIndexGlobal_.erase(processedList[i]);
322      countBuffIndexGlobal.erase(processedList[i]);
323      countBuffIndexServer.erase(processedList[i]);
324    }
325
326    if (0 == recvNbClient) isFinished = true;
327  }
328
329  delete [] sendBuff;
330  delete [] sendNbIndexBuff;
331  delete [] recvBuff;
332  delete [] recvNbIndexBuff;
333  delete [] recvIndexGlobalBuff;
334  delete [] recvIndexServerBuff;
335}
336
337/*!
338  Probe and receive message containg global index from other clients.
339  Each client can send a message of global index to other clients to fulfill their maps.
340Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
341  \param [in] recvIndexGlobalBuff buffer dedicated for receiving global index
342  \param [in] recvNbIndexCount size of the buffer
343*/
344void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount)
345{
346  MPI_Status statusIndexGlobal;
347  int flagIndexGlobal, count;
348
349  // Probing for global index
350  MPI_Iprobe(MPI_ANY_SOURCE, 15, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
351  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
352  {
353    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
354    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
355    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
356              statusIndexGlobal.MPI_SOURCE, 15, clientIntraComm_,
357              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
358    countIndexGlobal_ += count;
359  }
360}
361
362/*!
363  Probe and receive message containg server index from other clients.
364  Each client can send a message of server index to other clients to fulfill their maps.
365Each client probes message from its queue then if the message is ready, it will be put into the receiving buffer
366  \param [in] recvIndexServerBuff buffer dedicated for receiving server index
367  \param [in] recvNbIndexCount size of the buffer
368*/
369void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)
370{
371  MPI_Status statusIndexServer;
372  int flagIndexServer, count;
373
374  // Probing for server index
375  MPI_Iprobe(MPI_ANY_SOURCE, 12, clientIntraComm_, &flagIndexServer, &statusIndexServer);
376  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
377  {
378    MPI_Get_count(&statusIndexServer, MPI_INT, &count);
379    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
380    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
381              statusIndexServer.MPI_SOURCE, 12, clientIntraComm_,
382              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
383
384    countIndexServer_ += count;
385  }
386}
387
388/*!
389  Send message containing global index to clients
390  \param [in] clientDestRank rank of destination client
391  \param [in] indexGlobal global index to send
392  \param [in] clientIntraComm communication group of client
393  \param [in] requestSendIndexGlobal list of sending request
394*/
395void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,
396                                                               const MPI_Comm& clientIntraComm,
397                                                               std::list<MPI_Request>& requestSendIndexGlobal)
398{
399  MPI_Request request;
400  requestSendIndexGlobal.push_back(request);
401  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
402            clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back()));
403}
404
405/*!
406  Send message containing server index to clients
407  \param [in] clientDestRank rank of destination client
408  \param [in] indexServer server index to send
409  \param [in] clientIntraComm communication group of client
410  \param [in] requestSendIndexServer list of sending request
411*/
412void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,
413                                                               const MPI_Comm& clientIntraComm,
414                                                               std::list<MPI_Request>& requestSendIndexServer)
415{
416  MPI_Request request;
417  requestSendIndexServer.push_back(request);
418  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
419            clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back()));
420}
421
422/*!
423  Verify status of sending request
424  \param [in] sendRequest sending request to verify
425*/
426void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)
427{
428  int flag = 0;
429  MPI_Status status;
430  std::list<MPI_Request>::iterator itRequest;
431  int sizeListRequest = sendRequest.size();
432  int idx = 0;
433  while (idx < sizeListRequest)
434  {
435    bool isErased = false;
436    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
437    {
438      MPI_Test(&(*itRequest), &flag, &status);
439      if (true == flag)
440      {
441        isErased = true;
442        break;
443      }
444    }
445    if (true == isErased) sendRequest.erase(itRequest);
446    ++idx;
447  }
448}
449
450/*!
451  Process the received request. Pushing global index and server index into map
452  \param[in] buffIndexGlobal pointer to the begining of buffer containing global index
453  \param[in] buffIndexServer pointer to the begining of buffer containing server index
454  \param[in] count size of received message
455*/
456void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
457{
458  for (int i = 0; i < count; ++i)
459    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
460}
461
462/*!
463  Compute size of message containing global index
464  \param[in] requestRecv request of message
465*/
466int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
467{
468  int flag, count = 0;
469  MPI_Status status;
470
471  MPI_Test(&requestRecv, &flag, &status);
472  if (true == flag)
473  {
474    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
475  }
476
477  return count;
478}
479
480/*!
481  Compute size of message containing server index
482  \param[in] requestRecv request of message
483*/
484int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)
485{
486  int flag, count = 0;
487  MPI_Status status;
488
489  MPI_Test(&requestRecv, &flag, &status);
490  if (true == flag)
491  {
492    MPI_Get_count(&status, MPI_INT, &count);
493  }
494
495  return count;
496}
497
498/*!
499  Reset all receiving request map and counter
500*/
501void CClientServerMappingDistributed::resetReceivingRequestAndCount()
502{
503  countIndexGlobal_ = countIndexServer_ = 0;
504  requestRecvIndexGlobal_.clear();
505  requestRecvIndexServer_.clear();
506  indexGlobalBuffBegin_.clear();
507  indexServerBuffBegin_.clear();
508}
509
510}
Note: See TracBrowser for help on using the repository browser.