source: XIOS3/trunk/src/manager/services_manager.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 7 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.8 KB
Line 
1#include "daemons_manager.hpp"
2#include "services_manager.hpp"
3#include "ressources_manager.hpp"
4#include "cxios.hpp"
5#include "pool_ressource.hpp"
6#include "type.hpp"
7#include "server.hpp"
8#include "servers_ressource.hpp"
9#include "timer.hpp"
10
11namespace xios
12{
13  extern CLogType logTimers ;
14 
15
16  CServicesManager::CServicesManager(bool isXiosServer)
17  {
18   
19    int commRank ; 
20    xiosComm_ = CXios::getXiosComm() ;
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22   
23   
24    // The global manager leader will be the process of rank 0
25    // By "xiosComm" communicator construction
26    // - if servers exits it will be the root process of the servers communicator
27    // - otherwise the root process of the first model
28   
29    managerGlobalLeader_ = 0 ;
30
31    MPI_Comm_rank(xiosComm_, &commRank) ;
32    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_,"CServicesManager::winNotify_") ;
33    winNotify_->updateToExclusiveWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;
34
35    winServices_ = new CWindowManager(xiosComm_, maxBufferSize_,"CServicesManager::winServices_") ;
36    winServices_->updateToExclusiveWindow(commRank, this, &CServicesManager::servicesDumpOut) ;
37   
38    MPI_Barrier(xiosComm_)  ;   
39  }
40
41  CServicesManager::~CServicesManager()
42  {
43    delete winNotify_ ;
44    delete winServices_ ;
45  }
46
47  bool CServicesManager::createServices(const std::string& poolId, const std::string& serviceId, 
48                                        int type, int size, int nbPartitions, bool wait) 
49  {
50
51    int leader ;
52    int poolSize, poolFreeSize ;
53   
54    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
55    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
56    if (wait)
57    {
58      while (!ok) 
59      {
60        CXios::getDaemonsManager()->eventLoop() ;
61        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
62      }
63    }
64
65    if (ok) 
66    {
67      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ;
68      CXios::getRessourcesManager()->decreasePoolFreeSize(poolId ,size) ;
69      createServicesNotify(leader, serviceId, type, size, nbPartitions) ;
70      return true ;
71    }
72    else return false ;
73  }
74
75  bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, int type, const std::string& OnServiceId, bool wait)
76  {
77
78    int leader ;
79    int poolSize ;
80    int poolFreeSize ;
81   
82    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
83    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
84    if (wait)
85    {
86      while (!ok) 
87      {
88        CXios::getDaemonsManager()->eventLoop() ;
89        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ;
90      }
91    }
92
93    if (ok) 
94    {
95      info(40)<<"CServicesManager : create service on other, notification to leader "<<leader<<", serviceId : "<<serviceId<<", service onto : "<<OnServiceId<<endl ;
96      createServicesOntoNotify(leader, serviceId, type, OnServiceId) ;
97      return true ;
98    }
99    else return false ;
100  }
101
102  void CServicesManager::createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions)
103  {
104    notifyType_=NOTIFY_CREATE_SERVICE ;
105    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions ) ;
106    sendNotification(rank) ;
107  }
108
109
110  void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, int type, const string& OnServiceId)
111  {
112    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
113    notifyCreateServiceOnto_=make_tuple(serviceId, type, OnServiceId) ;
114    sendNotification(rank) ;
115  }
116
117  void CServicesManager::sendNotification(int rank)
118  {
119    winNotify_->pushToExclusiveWindow(rank, this, &CServicesManager::notificationsDumpOut) ;
120  }
121
122 
123  void CServicesManager::eventLoop(void)
124  {
125    if (info.isActive(logTimers)) CTimer::get("CServicesManager::eventLoop").resume();
126    int flag ;
127    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
128    double time=MPI_Wtime() ;
129    if (time-lastEventLoop_ > eventLoopLatency_) 
130    {
131      checkNotifications() ;
132      lastEventLoop_=time ;
133    }
134    if (info.isActive(logTimers)) CTimer::get("CServicesManager::eventLoop").suspend();
135  }
136
137
138
139  void CServicesManager::checkNotifications(void)
140  {
141    int commRank ;
142    MPI_Comm_rank(xiosComm_, &commRank) ;
143    winNotify_->popFromExclusiveWindow(commRank, this, &CServicesManager::notificationsDumpIn) ;
144    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
145    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
146  }
147
148  void CServicesManager::createService(void)
149  {
150    auto& arg=notifyCreateService_ ;
151    CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
152  }
153
154  void CServicesManager::createServiceOnto(void)
155  {
156    auto& arg=notifyCreateServiceOnto_ ;
157    CServer::getServersRessource()->getPoolRessource()->createServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
158  }
159
160  void CServicesManager::notificationsDumpOut(CBufferOut& buffer)
161  {
162
163    buffer.realloc(maxBufferSize_) ;
164   
165    if (notifyType_==NOTIFY_CREATE_SERVICE)
166    {
167      auto& arg=notifyCreateService_ ;
168      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
169    }
170    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
171    {
172      auto& arg=notifyCreateServiceOnto_ ;
173      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << get<2>(arg)  ;
174    }
175  }
176
177  void CServicesManager::notificationsDumpIn(CBufferIn& buffer)
178  {
179    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
180    else
181    {
182      buffer>>notifyType_;
183      if (notifyType_==NOTIFY_CREATE_SERVICE)
184      {
185        auto& arg=notifyCreateService_ ;
186        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
187      }
188      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
189      {
190        auto& arg=notifyCreateServiceOnto_ ;
191        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
192      }
193    }
194  } 
195 
196  void CServicesManager::servicesDumpOut(CBufferOut& buffer)
197  {
198   
199    buffer.realloc(maxBufferSize_) ;
200   
201    buffer<<(int)services_.size();
202   
203    for(auto it=services_.begin();it!=services_.end(); ++it)
204    { 
205      auto key = it->first ;
206      auto val = it->second ; 
207      buffer << std::get<0>(key) << std::get<1>(key) << std::get<2>(key) 
208             <<  static_cast<int>(std::get<0>(val)) << std::get<1>(val) << std::get<2>(val) << std::get<3>(val) ;
209    }
210  }
211
212  void CServicesManager::servicesDumpIn(CBufferIn& buffer)
213  {
214    std::string poolId, serviceId ;
215    int partitionId ;
216    int type ;
217    int size; 
218    int nbPartitions ;
219    int leader ;
220
221    int nbServices ;
222    buffer>>nbServices ;
223    bool newServices = nbServices != services_.size() ; 
224
225    services_.clear() ;
226
227    for(int i=0;i<nbServices;i++) 
228    {
229      buffer>>poolId>>serviceId>>partitionId>>type>>size>>nbPartitions>>leader ;
230      services_[std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
231      if (newServices)
232        info(40)<<"Receive new services informations : "<<poolId<<"::"<<serviceId<<"::"<<partitionId<<" => type : "<<type<<"  size : "<<size<<"  nbPartitions : "<<nbPartitions<<"  leader : "<<leader<<endl ;
233    }
234  }
235
236  void CServicesManager::registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, 
237                                         int size, int nbPartitions, int leader)
238  {
239   
240    info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ;
241
242    winServices_->lockWindowExclusive(managerGlobalLeader_) ;
243    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
244    winServices_->flushWindow(managerGlobalLeader_) ;
245    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
246    winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ;
247    winServices_->unlockWindowExclusive(managerGlobalLeader_) ;
248
249  }
250
251  bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, 
252                                        int& size, int& nbPartitions, int& leader, bool wait)
253  {
254    winServices_->updateFromSharedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
255   
256    if (wait) waitServiceRegistration(poolId, serviceId, partitionId) ;
257    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ;
258    if ( it == services_.end()) return false ;
259    else
260    {
261      type= std::get<0>(it->second); 
262      size= std::get<1>(it->second); 
263      nbPartitions = std::get<2>(it->second); 
264      leader = std::get<3>(it->second); 
265      return true ;
266    }
267  }
268
269  bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader, bool wait)
270  {
271    int type;
272    int size ;
273    int nbPartitions;
274    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader, wait) ;
275  }
276
277  bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, bool wait)
278  {
279    int size ;
280    int nbPartitions;
281    int leader;
282    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader, wait) ;
283  }
284
285  bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions, bool wait)
286  {
287    int size ;
288    int type;
289    int leader;
290    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader, wait) ;
291  }
292
293  bool CServicesManager::hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId)
294  {
295    winServices_->updateFromSharedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
296    auto it=services_.find(std::tuple<std::string, std::string, int>(poolId, serviceId, partitionId)) ;
297    if ( it == services_.end()) return false ;
298    else return true ;
299  }
300 
301  void CServicesManager::waitServiceRegistration(const std::string& poolId, const std::string& serviceId, const int& partitionId)
302  {
303    while(!hasService(poolId,serviceId,partitionId)) CXios::getDaemonsManager()->servicesEventLoop() ;
304  }
305
306  void CServicesManager::waitServiceRegistration(const std::string& poolId, const std::string& serviceId)
307  {
308    int nbPartition ;
309    getServiceNbPartitions(poolId,serviceId,0,nbPartition, true) ;
310    for(int n=1;n<nbPartition;n++) waitServiceRegistration(poolId, serviceId, n) ;
311
312  }
313}
Note: See TracBrowser for help on using the repository browser.