source: XIOS3/trunk/src/manager/services_manager.cpp @ 2498

Last change on this file since 2498 was 2498, checked in by jderouillat, 15 months ago

Revert 2494 partially (keep initialisation of notifyType_), the associated wait managment causes deadlock in dynamico like test cases on JeanZay?

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.9 KB
RevLine 
[1761]1#include "daemons_manager.hpp"
2#include "services_manager.hpp"
3#include "ressources_manager.hpp"
4#include "cxios.hpp"
5#include "pool_ressource.hpp"
6#include "type.hpp"
7#include "server.hpp"
8#include "servers_ressource.hpp"
[2246]9#include "timer.hpp"
[1761]10
11namespace xios
12{
13
14 
15
16  CServicesManager::CServicesManager(bool isXiosServer)
17  {
18   
19    int commRank ; 
20    xiosComm_ = CXios::getXiosComm() ;
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22   
23   
24    // The global manager leader will be the process of rank 0
25    // By "xiosComm" communicator construction
26    // - if servers exits it will be the root process of the servers communicator
27    // - otherwise the root process of the first model
28   
29    managerGlobalLeader_ = 0 ;
30
31    MPI_Comm_rank(xiosComm_, &commRank) ;
32    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
33    winNotify_->lockWindow(commRank,0) ;
34    winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;
35    winNotify_->unlockWindow(commRank,0) ;
36
37    winServices_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
38    winServices_->lockWindow(commRank,0) ;
39    winServices_->updateToWindow(commRank, this, &CServicesManager::servicesDumpOut) ;
40    winServices_->unlockWindow(commRank,0) ;
41
42    MPI_Barrier(xiosComm_)  ;   
43  }
44
[1764]45  CServicesManager::~CServicesManager()
46  {
47    delete winNotify_ ;
48    delete winServices_ ;
49  }
50
[1761]51  bool CServicesManager::createServices(const std::string& poolId, const std::string& serviceId, 
52                                        int type, int size, int nbPartitions, bool wait) 
53  {
54
55    int leader ;
[2458]56    int poolSize, poolFreeSize ;
[1761]57   
[2246]58    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
[2458]59    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
[1761]60    if (wait)
61    {
62      while (!ok) 
63      {
64        CXios::getDaemonsManager()->eventLoop() ;
[2458]65        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
[1761]66      }
67    }
68
69    if (ok) 
70    {
[2246]71      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ;
[2458]72      CXios::getRessourcesManager()->decreasePoolFreeSize(poolId ,size) ;
[1761]73      createServicesNotify(leader, serviceId, type, size, nbPartitions) ;
74      return true ;
75    }
76    else return false ;
77  }
78
[2404]79  bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, int type, const std::string& OnServiceId, bool wait)
[2403]80  {
[1761]81
[2403]82    int leader ;
83    int poolSize ;
[2458]84    int poolFreeSize ;
[2403]85   
86    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
[2458]87    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
[2403]88    if (wait)
89    {
90      while (!ok) 
91      {
92        CXios::getDaemonsManager()->eventLoop() ;
[2458]93        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
[2403]94      }
95    }
96
97    if (ok) 
98    {
99      info(40)<<"CServicesManager : create service on other, notification to leader "<<leader<<", serviceId : "<<serviceId<<", service onto : "<<OnServiceId<<endl ;
[2404]100      createServicesOntoNotify(leader, serviceId, type, OnServiceId) ;
[2403]101      return true ;
102    }
103    else return false ;
104  }
105
[1761]106  void CServicesManager::createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions)
107  {
[2403]108    notifyType_=NOTIFY_CREATE_SERVICE ;
109    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions ) ;
110    sendNotification(rank) ;
[1761]111  }
112
[2403]113
[2404]114  void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, int type, const string& OnServiceId)
[1761]115  {
[2403]116    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
[2404]117    notifyCreateServiceOnto_=make_tuple(serviceId, type, OnServiceId) ;
[2403]118    sendNotification(rank) ;
119  }
[1761]120
[2403]121  void CServicesManager::sendNotification(int rank)
122  {
123    winNotify_->lockWindowExclusive(rank) ;
124    winNotify_->pushToLockedWindow(rank, this, &CServicesManager::notificationsDumpOut) ;
125    winNotify_->unlockWindow(rank) ;
[1761]126  }
127
[2403]128 
[1761]129  void CServicesManager::eventLoop(void)
130  {
[2246]131    CTimer::get("CServicesManager::eventLoop").resume();
[2260]132    int flag ;
133    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
[2246]134    double time=MPI_Wtime() ;
135    if (time-lastEventLoop_ > eventLoopLatency_) 
136    {
[2403]137      checkNotifications() ;
[2246]138      lastEventLoop_=time ;
139    }
140    CTimer::get("CServicesManager::eventLoop").suspend();
[1761]141  }
142
[2403]143
144
145  void CServicesManager::checkNotifications(void)
146  {
147    int commRank ;
148    MPI_Comm_rank(xiosComm_, &commRank) ;
149    winNotify_->lockWindowExclusive(commRank) ;
150    winNotify_->popFromLockedWindow(commRank, this, &CServicesManager::notificationsDumpIn) ;
151    winNotify_->unlockWindow(commRank) ;
152    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
153    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
154  }
155
156  void CServicesManager::createService(void)
157  {
158    auto& arg=notifyCreateService_ ;
159    CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
160  }
161
162  void CServicesManager::createServiceOnto(void)
163  {
[2404]164    auto& arg=notifyCreateServiceOnto_ ;
165    CServer::getServersRessource()->getPoolRessource()->createServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
[2403]166  }
167
[1761]168  void CServicesManager::notificationsDumpOut(CBufferOut& buffer)
169  {
[2403]170
[1761]171    buffer.realloc(maxBufferSize_) ;
172   
[2403]173    if (notifyType_==NOTIFY_CREATE_SERVICE)
174    {
175      auto& arg=notifyCreateService_ ;
176      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
177    }
178    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
179    {
180      auto& arg=notifyCreateServiceOnto_ ;
[2404]181      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << get<2>(arg)  ;
[2403]182    }
[1761]183  }
184
185  void CServicesManager::notificationsDumpIn(CBufferIn& buffer)
186  {
[2403]187    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
188    else
[1761]189    {
[2403]190      buffer>>notifyType_;
191      if (notifyType_==NOTIFY_CREATE_SERVICE)
192      {
193        auto& arg=notifyCreateService_ ;
194        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
195      }
196      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
197      {
198        auto& arg=notifyCreateServiceOnto_ ;
[2404]199        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
[2403]200      }
[1761]201    }
[2403]202  } 
[1761]203 
204  void CServicesManager::servicesDumpOut(CBufferOut& buffer)
205  {
206   
207    buffer.realloc(maxBufferSize_) ;
208   
209    buffer<<(int)services_.size();
210   
211    for(auto it=services_.begin();it!=services_.end(); ++it)
212    { 
213      auto key = it->first ;
214      auto val = it->second ; 
215      buffer << std::get<0>(key) << std::get<1>(key) << std::get<2>(key) 
216             <<  static_cast<int>(std::get<0>(val)) << std::get<1>(val) << std::get<2>(val) << std::get<3>(val) ;
217    }
218  }
219
220  void CServicesManager::servicesDumpIn(CBufferIn& buffer)
221  {
222    std::string poolId, serviceId ;
223    int partitionId ;
224    int type ;
225    int size; 
226    int nbPartitions ;
227    int leader ;
228
229    services_.clear() ;
230    int nbServices ;
231    buffer>>nbServices ;
[2494]232
[1761]233    for(int i=0;i<nbServices;i++) 
234    {
235      buffer>>poolId>>serviceId>>partitionId>>type>>size>>nbPartitions>>leader ;
236      services_[std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
237    }
238  }
239
240  void CServicesManager::registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, 
241                                         int size, int nbPartitions, int leader)
242  {
243   
[2246]244    info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ;
245
246    winServices_->lockWindowExclusive(managerGlobalLeader_) ;
247    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
248    winServices_->flushWindow(managerGlobalLeader_) ;
249    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
250    winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ;
251    winServices_->unlockWindow(managerGlobalLeader_) ;
252
[1761]253  }
254
255  bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, 
[2458]256                                        int& size, int& nbPartitions, int& leader, bool wait)
[1761]257  {
[2246]258   
259    winServices_->lockWindowShared(managerGlobalLeader_) ;
260    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
261    winServices_->unlockWindow(managerGlobalLeader_) ;
[1761]262
263    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ;
[2458]264    if ( it == services_.end() && !wait) return false ;
[1761]265    else
266    {
[2458]267      if (wait) waitServiceRegistration(poolId, serviceId, partitionId) ;
[1761]268      type= std::get<0>(it->second); 
269      size= std::get<1>(it->second); 
270      nbPartitions = std::get<2>(it->second); 
271      leader = std::get<3>(it->second); 
272      return true ;
273    }
274  }
275
[2458]276  bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader, bool wait)
[1761]277  {
278    int type;
279    int size ;
280    int nbPartitions;
[2498]281    //return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader, wait) ;
282    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
[1761]283  }
284
[2458]285  bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, bool wait)
[1761]286  {
287    int size ;
288    int nbPartitions;
289    int leader;
[2498]290    //return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader, wait) ;
291    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
[1761]292  }
293
[2458]294  bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions, bool wait)
[1761]295  {
296    int size ;
297    int type;
298    int leader;
[2498]299    //return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader, wait) ;
300    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
[1761]301  }
302
303  bool CServicesManager::hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId)
304  {
305    winServices_->lockWindow(managerGlobalLeader_,0) ;
306    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
307    winServices_->unlockWindow(managerGlobalLeader_,0) ;
308    auto it=services_.find(std::tuple<std::string, std::string, int>(poolId, serviceId, partitionId)) ;
309    if ( it == services_.end()) return false ;
310    else return true ;
311  }
[2458]312 
313  void CServicesManager::waitServiceRegistration(const std::string& poolId, const std::string& serviceId, const int& partitionId)
314  {
315    while(!hasService(poolId,serviceId,partitionId)) CXios::getDaemonsManager()->servicesEventLoop() ;
316  }
[1761]317
[2458]318}
Note: See TracBrowser for help on using the repository browser.