source: XIOS/trunk/src/client_server_mapping_distributed.cpp @ 568

Last change on this file since 568 was 568, checked in by mhnguyen, 9 years ago

Implementing discovering algorithm of server index

+) Implement the algorithm with only one level
+) Remove some redundant functions, corrects some interface

Test
+) On Curie
+) Test passed and results are correct

File size: 19.3 KB
Line 
1#include "client_server_mapping_distributed.hpp"
2#include <limits>
3#include <boost/functional/hash.hpp>
4
5namespace xios
6{
7
8CClientServerMappingDistributed::CClientServerMappingDistributed(const boost::unordered_map<size_t,int>& globalIndexOfServer,
9                                                                 const MPI_Comm& clientIntraComm) : CClientServerMapping(), indexClientHash_()
10{
11  clientIntraComm_ = clientIntraComm;
12  MPI_Comm_size(clientIntraComm,&(nbClient_));
13  MPI_Comm_rank(clientIntraComm,&clientRank_) ;
14  computeDistributedServerIndex(globalIndexOfServer, clientIntraComm);
15}
16
17CClientServerMappingDistributed::~CClientServerMappingDistributed()
18{
19
20}
21
22void CClientServerMappingDistributed::computeServerIndexMapping(const CArray<size_t,1>& globalIndexOnClient,
23                                                                const CArray<int,1>& localIndexOnClient)
24{
25  size_t ssize = globalIndexOnClient.numElements(), hashedIndex;
26
27  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
28                                      iteClientHash = indexClientHash_.end();
29  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
30  std::map<int, std::vector<int> > client2ClientIndexServer;
31  std::map<int, std::vector<int> > clientLocalIndex;
32
33  // Number of global index whose mapping server can be found out thanks to index-server mapping
34  int nbIndexAlreadyOnClient = 0;
35
36  // Number of global index whose mapping server are on other clients
37  int nbIndexSendToOthers = 0;
38  boost::hash<size_t> hashGlobalIndex;
39  for (int i = 0; i < ssize; ++i)
40  {
41    size_t globalIndexClient = globalIndexOnClient(i);
42    hashedIndex  = hashGlobalIndex(globalIndexClient);
43    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashedIndex);
44    if (iteClientHash != itClientHash)
45    {
46      int indexClient = std::distance(itbClientHash, itClientHash)-1;
47
48      if (clientRank_ == indexClient)
49      {
50        (indexGlobalOnServer_[globalIndexToServerMapping_[globalIndexClient]]).push_back(globalIndexClient);
51        (localIndexSend2Server_[globalIndexToServerMapping_[globalIndexClient]]).push_back(localIndexOnClient(i));
52        ++nbIndexAlreadyOnClient;
53      }
54      else
55      {
56        client2ClientIndexGlobal[indexClient].push_back(globalIndexClient);
57        clientLocalIndex[indexClient].push_back(i);
58        ++nbIndexSendToOthers;
59      }
60    }
61  }
62
63  int* sendBuff = new int[nbClient_];
64  for (int i = 0; i < nbClient_; ++i) sendBuff[i] = 0;
65  std::map<int, std::vector<size_t> >::iterator it  = client2ClientIndexGlobal.begin(),
66                                                ite = client2ClientIndexGlobal.end();
67  for (; it != ite; ++it) sendBuff[it->first] = 1;
68  int* recvBuff = new int[nbClient_];
69  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm_);
70
71  std::list<MPI_Request> sendRequest;
72  if (0 != nbIndexSendToOthers)
73      for (it = client2ClientIndexGlobal.begin(); it != ite; ++it)
74         sendIndexGlobalToClients(it->first, it->second, clientIntraComm_, sendRequest);
75
76  // Receiving demand as well as the responds from other clients
77  // The demand message contains global index; meanwhile the responds have server index information
78  // Buffer to receive demand from other clients, it can be allocated or not depending whether it has demand(s)
79  unsigned long* recvBuffIndexGlobal = 0;
80  int maxNbIndexDemandedFromOthers = (nbIndexAlreadyOnClient >= globalIndexToServerMapping_.size())
81                                   ? 0 : (globalIndexToServerMapping_.size() - nbIndexAlreadyOnClient);
82  if (0 != maxNbIndexDemandedFromOthers)
83    recvBuffIndexGlobal = new unsigned long[maxNbIndexDemandedFromOthers];
84
85  // Buffer to receive respond from other clients, it can be allocated or not depending whether it demands other clients
86  int* recvBuffIndexServer = 0;
87  int nbIndexReceivedFromOthers = nbIndexSendToOthers;
88  if (0 != nbIndexReceivedFromOthers)
89    recvBuffIndexServer = new int[nbIndexReceivedFromOthers];
90
91  resetRequestAndCount();
92  std::map<int, MPI_Request>::iterator itRequest;
93  std::vector<int> demandAlreadyReceived, repondAlreadyReceived;
94  int nbDemandingClient = recvBuff[clientRank_], nbIndexServerReceived = 0;
95  while ((0 < nbDemandingClient) || (!sendRequest.empty()) ||
96         (nbIndexServerReceived < nbIndexReceivedFromOthers))
97  {
98    // Just check whether a client has any demand from other clients.
99    // If it has, then it should send responds to these client(s)
100    probeIndexGlobalMessageFromClients(recvBuffIndexGlobal, maxNbIndexDemandedFromOthers);
101    if (0 < nbDemandingClient)
102    {
103      for (itRequest = requestRecvIndexGlobal_.begin();
104           itRequest != requestRecvIndexGlobal_.end(); ++itRequest)
105      {
106        int flagIndexGlobal, count;
107        MPI_Status statusIndexGlobal;
108
109        MPI_Test(&(itRequest->second), &flagIndexGlobal, &statusIndexGlobal);
110        if (true == flagIndexGlobal)
111        {
112          MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
113          int clientSourceRank = statusIndexGlobal.MPI_SOURCE;
114          unsigned long* beginBuff = indexGlobalBuffBegin_[clientSourceRank];
115          for (int i = 0; i < count; ++i)
116          {
117            client2ClientIndexServer[clientSourceRank].push_back(globalIndexToServerMapping_[*(beginBuff+i)]);
118          }
119          sendIndexServerToClients(clientSourceRank, client2ClientIndexServer[clientSourceRank], clientIntraComm_, sendRequest);
120          --nbDemandingClient;
121
122          demandAlreadyReceived.push_back(clientSourceRank);
123        }
124      }
125      for (int i = 0; i< demandAlreadyReceived.size(); ++i)
126        requestRecvIndexGlobal_.erase(demandAlreadyReceived[i]);
127    }
128
129    testSendRequest(sendRequest);
130
131    // In some cases, a client need to listen respond from other clients about server information
132    // Ok, with the information, a client can fill in its server-global index map.
133    probeIndexServerMessageFromClients(recvBuffIndexServer, nbIndexReceivedFromOthers);
134    for (itRequest = requestRecvIndexServer_.begin();
135         itRequest != requestRecvIndexServer_.end();
136         ++itRequest)
137    {
138      int flagIndexServer, count;
139      MPI_Status statusIndexServer;
140
141      MPI_Test(&(itRequest->second), &flagIndexServer, &statusIndexServer);
142      if (true == flagIndexServer)
143      {
144        MPI_Get_count(&statusIndexServer, MPI_INT, &count);
145        int clientSourceRank = statusIndexServer.MPI_SOURCE;
146        int* beginBuff = indexServerBuffBegin_[clientSourceRank];
147        std::vector<size_t>& globalIndexTmp = client2ClientIndexGlobal[clientSourceRank];
148        std::vector<int>& localIndexTmp = clientLocalIndex[clientSourceRank];
149        for (int i = 0; i < count; ++i)
150        {
151          (indexGlobalOnServer_[*(beginBuff+i)]).push_back(globalIndexTmp[i]);
152          (localIndexSend2Server_[*(beginBuff+i)]).push_back(localIndexOnClient(localIndexTmp[i]));
153        }
154        nbIndexServerReceived += count;
155        repondAlreadyReceived.push_back(clientSourceRank);
156      }
157    }
158
159    for (int i = 0; i< repondAlreadyReceived.size(); ++i)
160      requestRecvIndexServer_.erase(repondAlreadyReceived[i]);
161    repondAlreadyReceived.resize(0);
162  }
163
164  if (0 != recvBuffIndexGlobal) delete recvBuffIndexGlobal;
165  if (0 != recvBuffIndexServer) delete recvBuffIndexServer;
166  delete [] sendBuff;
167  delete [] recvBuff;
168}
169
170void CClientServerMappingDistributed::computeHashIndex()
171{
172  // Compute range of hash index for each client
173  indexClientHash_.resize(nbClient_+1);
174  size_t nbHashIndexMax = std::numeric_limits<size_t>::max();
175  size_t nbHashIndex;
176  indexClientHash_[0] = 0;
177  for (int i = 1; i < nbClient_; ++i)
178  {
179    nbHashIndex = nbHashIndexMax / nbClient_;
180    if (i < (nbHashIndexMax%nbClient_)) ++nbHashIndex;
181    indexClientHash_[i] = indexClientHash_[i-1] + nbHashIndex;
182  }
183  indexClientHash_[nbClient_] = nbHashIndexMax;
184}
185
186void CClientServerMappingDistributed::computeDistributedServerIndex(const boost::unordered_map<size_t,int>& globalIndexOfServer,
187                                                                    const MPI_Comm& clientIntraComm)
188{
189  computeHashIndex();
190  int clientRank;
191  MPI_Comm_rank(clientIntraComm,&clientRank);
192
193  int* sendBuff = new int[nbClient_];
194  int* sendNbIndexBuff = new int[nbClient_];
195  for (int i = 0; i < nbClient_; ++i)
196  {
197    sendBuff[i] = 0; sendNbIndexBuff[i] = 0;
198  }
199
200  // Compute size of sending and receving buffer
201  std::map<int, std::vector<size_t> > client2ClientIndexGlobal;
202  std::map<int, std::vector<int> > client2ClientIndexServer;
203
204  std::vector<size_t>::const_iterator itbClientHash = indexClientHash_.begin(), itClientHash,
205                                      iteClientHash = indexClientHash_.end();
206  boost::unordered_map<size_t,int>::const_iterator it  = globalIndexOfServer.begin(),
207                                                   ite = globalIndexOfServer.end();
208  boost::hash<size_t> hashGlobalIndex;
209  for (; it != ite; ++it)
210  {
211    size_t hashIndex = hashGlobalIndex(it->first);
212    itClientHash = std::upper_bound(itbClientHash, iteClientHash, hashIndex);
213    if (itClientHash != iteClientHash)
214    {
215      int indexClient = std::distance(itbClientHash, itClientHash)-1;
216      if (clientRank == indexClient)
217      {
218        globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(it->first, it->second));
219      }
220      else
221      {
222        sendBuff[indexClient] = 1;
223        ++sendNbIndexBuff[indexClient];
224        client2ClientIndexGlobal[indexClient].push_back(it->first);
225        client2ClientIndexServer[indexClient].push_back(it->second);
226      }
227    }
228  }
229
230
231//    for (boost::unordered_map<size_t,int>::const_iterator it = globalIndexToServerMapping_.begin();
232//       it != globalIndexToServerMapping_.end(); ++it)
233//       std::cout << " " << it->first << ":" << it->second;
234//       std::cout << "First Number = " << globalIndexToServerMapping_.size() << std::endl;
235
236
237  int* recvBuff = new int[nbClient_];
238  MPI_Allreduce(sendBuff, recvBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
239
240  int* recvNbIndexBuff = new int[nbClient_];
241  MPI_Allreduce(sendNbIndexBuff, recvNbIndexBuff, nbClient_, MPI_INT, MPI_SUM, clientIntraComm);
242
243  MPI_Status statusIndexGlobal, statusIndexServer;
244  int flag, countIndexGlobal_ = 0, countIndexServer_ = 0;
245
246
247  std::map<int, MPI_Request>::iterator itRequestIndexGlobal, itRequestIndexServer;
248  std::map<int, int> countBuffIndexServer, countBuffIndexGlobal;
249  std::vector<int> processedList;
250
251
252  bool isFinished=false;
253  int recvNbIndexCount = recvNbIndexBuff[clientRank];
254  int recvNbClient = recvBuff[clientRank];
255  unsigned long* recvIndexGlobalBuff = new unsigned long[recvNbIndexCount];
256  int* recvIndexServerBuff = new int[recvNbIndexCount];
257
258  std::list<MPI_Request> sendRequest;
259  std::map<int, std::vector<size_t> >::iterator itGlobal  = client2ClientIndexGlobal.begin(),
260                                                iteGlobal = client2ClientIndexGlobal.end();
261  for ( ; itGlobal != iteGlobal; ++itGlobal)
262    sendIndexGlobalToClients(itGlobal->first, itGlobal->second, clientIntraComm, sendRequest);
263  std::map<int, std::vector<int> >::iterator itServer  = client2ClientIndexServer.begin(),
264                                             iteServer = client2ClientIndexServer.end();
265  for (; itServer != iteServer; ++itServer)
266    sendIndexServerToClients(itServer->first, itServer->second, clientIntraComm, sendRequest);
267
268  resetRequestAndCount();
269  while (!isFinished || (!sendRequest.empty()))
270  {
271    testSendRequest(sendRequest);
272    probeIndexGlobalMessageFromClients(recvIndexGlobalBuff, recvNbIndexCount);
273
274    // Processing complete request
275    for (itRequestIndexGlobal = requestRecvIndexGlobal_.begin();
276         itRequestIndexGlobal != requestRecvIndexGlobal_.end();
277         ++itRequestIndexGlobal)
278    {
279      int rank = itRequestIndexGlobal->first;
280      int countIndexGlobal = computeBuffCountIndexGlobal(itRequestIndexGlobal->second);
281      if (0 != countIndexGlobal)
282        countBuffIndexGlobal[rank] = countIndexGlobal;
283    }
284
285    probeIndexServerMessageFromClients(recvIndexServerBuff, recvNbIndexCount);
286    for (itRequestIndexServer = requestRecvIndexServer_.begin();
287         itRequestIndexServer != requestRecvIndexServer_.end();
288         ++itRequestIndexServer)
289    {
290      int rank = itRequestIndexServer->first;
291      int countIndexServer = computeBuffCountIndexServer(itRequestIndexServer->second);
292      if (0 != countIndexServer)
293        countBuffIndexServer[rank] = countIndexServer;
294    }
295
296    for (std::map<int, int>::iterator it = countBuffIndexGlobal.begin();
297                                      it != countBuffIndexGlobal.end(); ++it)
298    {
299      int rank = it->first;
300      if (countBuffIndexServer.end() != countBuffIndexServer.find(rank))
301      {
302        processReceivedRequest(indexGlobalBuffBegin_[rank], indexServerBuffBegin_[rank], it->second);
303        processedList.push_back(rank);
304        --recvNbClient;
305      }
306
307    }
308
309    for (int i = 0; i < processedList.size(); ++i)
310    {
311      requestRecvIndexServer_.erase(processedList[i]);
312      requestRecvIndexGlobal_.erase(processedList[i]);
313      countBuffIndexGlobal.erase(processedList[i]);
314      countBuffIndexServer.erase(processedList[i]);
315    }
316
317    if (0 == recvNbClient) isFinished = true;
318  }
319
320  delete [] sendBuff;
321  delete [] sendNbIndexBuff;
322  delete [] recvBuff;
323  delete [] recvNbIndexBuff;
324  delete [] recvIndexGlobalBuff;
325  delete [] recvIndexServerBuff;
326
327//    for (boost::unordered_map<size_t,int>::const_iterator it = globalIndexToServerMapping_.begin();
328//       it != globalIndexToServerMapping_.end(); ++it)
329//       std::cout << " " << it->first << ":" << it->second;
330//       std::cout << "Number = " << globalIndexToServerMapping_.size() << std::endl;
331
332}
333
334void CClientServerMappingDistributed::probeIndexGlobalMessageFromClients(unsigned long* recvIndexGlobalBuff, int recvNbIndexCount)
335{
336  MPI_Status statusIndexGlobal;
337  int flagIndexGlobal, count;
338
339  // Probing for global index
340  MPI_Iprobe(MPI_ANY_SOURCE, 15, clientIntraComm_, &flagIndexGlobal, &statusIndexGlobal);
341  if ((true == flagIndexGlobal) && (countIndexGlobal_ < recvNbIndexCount))
342  {
343    MPI_Get_count(&statusIndexGlobal, MPI_UNSIGNED_LONG, &count);
344    indexGlobalBuffBegin_.insert(std::make_pair<int, unsigned long*>(statusIndexGlobal.MPI_SOURCE, recvIndexGlobalBuff+countIndexGlobal_));
345    MPI_Irecv(recvIndexGlobalBuff+countIndexGlobal_, count, MPI_UNSIGNED_LONG,
346              statusIndexGlobal.MPI_SOURCE, 15, clientIntraComm_,
347              &requestRecvIndexGlobal_[statusIndexGlobal.MPI_SOURCE]);
348    countIndexGlobal_ += count;
349  }
350}
351
352void CClientServerMappingDistributed::probeIndexServerMessageFromClients(int* recvIndexServerBuff, int recvNbIndexCount)
353{
354  MPI_Status statusIndexServer;
355  int flagIndexServer, count;
356
357  // Probing for server index
358  MPI_Iprobe(MPI_ANY_SOURCE, 12, clientIntraComm_, &flagIndexServer, &statusIndexServer);
359  if ((true == flagIndexServer) && (countIndexServer_ < recvNbIndexCount))
360  {
361    MPI_Get_count(&statusIndexServer, MPI_INT, &count);
362    indexServerBuffBegin_.insert(std::make_pair<int, int*>(statusIndexServer.MPI_SOURCE, recvIndexServerBuff+countIndexServer_));
363    MPI_Irecv(recvIndexServerBuff+countIndexServer_, count, MPI_INT,
364              statusIndexServer.MPI_SOURCE, 12, clientIntraComm_,
365              &requestRecvIndexServer_[statusIndexServer.MPI_SOURCE]);
366
367    countIndexServer_ += count;
368  }
369}
370
371
372void CClientServerMappingDistributed::sendIndexGlobalToClients(int clientDestRank, std::vector<size_t>& indexGlobal,
373                                                               const MPI_Comm& clientIntraComm,
374                                                               std::list<MPI_Request>& requestSendIndexGlobal)
375{
376  MPI_Request request;
377  requestSendIndexGlobal.push_back(request);
378  MPI_Isend(&(indexGlobal)[0], (indexGlobal).size(), MPI_UNSIGNED_LONG,
379            clientDestRank, 15, clientIntraComm, &(requestSendIndexGlobal.back()));
380
381//  int nbSendClient = indexGlobal.size();
382//  std::map<int, std::vector<size_t> >::iterator
383//                        itClient2ClientIndexGlobal  = indexGlobal.begin(),
384//                        iteClient2ClientIndexGlobal = indexGlobal.end();
385//
386//  for (; itClient2ClientIndexGlobal != iteClient2ClientIndexGlobal;
387//         ++itClient2ClientIndexGlobal)
388//  {
389//    MPI_Request request;
390//    requestSendIndexGlobal.push_back(request);
391//    MPI_Isend(&(itClient2ClientIndexGlobal->second)[0],
392//              (itClient2ClientIndexGlobal->second).size(),
393//              MPI_UNSIGNED_LONG,
394//              itClient2ClientIndexGlobal->first,
395//              15, clientIntraComm, &(requestSendIndexGlobal.back()));
396//  }
397
398}
399
400void CClientServerMappingDistributed::sendIndexServerToClients(int clientDestRank, std::vector<int>& indexServer,
401                                                               const MPI_Comm& clientIntraComm,
402                                                               std::list<MPI_Request>& requestSendIndexServer)
403{
404  MPI_Request request;
405  requestSendIndexServer.push_back(request);
406  MPI_Isend(&(indexServer)[0], (indexServer).size(), MPI_INT,
407            clientDestRank, 12, clientIntraComm, &(requestSendIndexServer.back()));
408
409//  int nbSendClient = indexServer.size();
410//  std::map<int, std::vector<int> >::iterator
411//                        itClient2ClientIndexServer  = indexServer.begin(),
412//                        iteClient2ClientIndexServer = indexServer.end();
413
414//  for (; itClient2ClientIndexServer != iteClient2ClientIndexServer;
415//         ++itClient2ClientIndexServer)
416//  {
417//    MPI_Request request;
418//    requestSendIndexServer.push_back(request);
419//    MPI_Isend(&(itClient2ClientIndexServer->second)[0],
420//              (itClient2ClientIndexServer->second).size(),
421//              MPI_INT,
422//              itClient2ClientIndexServer->first,
423//              12, clientIntraComm, &(requestSendIndexServer.back()));
424//
425//  }
426}
427
428void CClientServerMappingDistributed::testSendRequest(std::list<MPI_Request>& sendRequest)
429{
430  int flag = 0;
431  MPI_Status status;
432  std::list<MPI_Request>::iterator itRequest;
433  int sizeListRequest = sendRequest.size();
434  int idx = 0;
435  while (idx < sizeListRequest)
436  {
437    bool isErased = false;
438    for (itRequest = sendRequest.begin(); itRequest != sendRequest.end(); ++itRequest)
439    {
440      MPI_Test(&(*itRequest), &flag, &status);
441      if (true == flag)
442      {
443        --sizeListRequest;
444        isErased = true;
445        break;
446      }
447    }
448    if (true == isErased) sendRequest.erase(itRequest);
449    ++idx;
450  }
451}
452
453void CClientServerMappingDistributed::processReceivedRequest(unsigned long* buffIndexGlobal, int* buffIndexServer, int count)
454{
455  for (int i = 0; i < count; ++i)
456    globalIndexToServerMapping_.insert(std::make_pair<size_t,int>(*(buffIndexGlobal+i),*(buffIndexServer+i)));
457}
458
459int CClientServerMappingDistributed::computeBuffCountIndexGlobal(MPI_Request& requestRecv)
460{
461  int flag, count = 0;
462  MPI_Status status;
463
464  MPI_Test(&requestRecv, &flag, &status);
465  if (true == flag)
466  {
467    MPI_Get_count(&status, MPI_UNSIGNED_LONG, &count);
468  }
469
470  return count;
471}
472
473int CClientServerMappingDistributed::computeBuffCountIndexServer(MPI_Request& requestRecv)
474{
475  int flag, count = 0;
476  MPI_Status status;
477
478  MPI_Test(&requestRecv, &flag, &status);
479  if (true == flag)
480  {
481    MPI_Get_count(&status, MPI_INT, &count);
482  }
483
484  return count;
485}
486
487void CClientServerMappingDistributed::resetRequestAndCount()
488{
489  countIndexGlobal_ = countIndexServer_ = 0;
490  requestRecvIndexGlobal_.clear();
491  requestRecvIndexServer_.clear();
492  indexGlobalBuffBegin_.clear();
493  indexServerBuffBegin_.clear();
494}
495
496}
Note: See TracBrowser for help on using the repository browser.