source: XIOS3/trunk/src/manager/services_manager.cpp @ 2403

Last change on this file since 2403 was 2403, checked in by ymipsl, 21 months ago

Refactor service manager notification
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.0 KB
Line 
1#include "daemons_manager.hpp"
2#include "services_manager.hpp"
3#include "ressources_manager.hpp"
4#include "cxios.hpp"
5#include "pool_ressource.hpp"
6#include "type.hpp"
7#include "server.hpp"
8#include "servers_ressource.hpp"
9#include "timer.hpp"
10
11namespace xios
12{
13
14 
15
16  CServicesManager::CServicesManager(bool isXiosServer)
17  {
18   
19    int commRank ; 
20    xiosComm_ = CXios::getXiosComm() ;
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22   
23   
24    // The global manager leader will be the process of rank 0
25    // By "xiosComm" communicator construction
26    // - if servers exits it will be the root process of the servers communicator
27    // - otherwise the root process of the first model
28   
29    managerGlobalLeader_ = 0 ;
30
31    MPI_Comm_rank(xiosComm_, &commRank) ;
32    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
33    winNotify_->lockWindow(commRank,0) ;
34    winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;
35    winNotify_->unlockWindow(commRank,0) ;
36
37    winServices_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
38    winServices_->lockWindow(commRank,0) ;
39    winServices_->updateToWindow(commRank, this, &CServicesManager::servicesDumpOut) ;
40    winServices_->unlockWindow(commRank,0) ;
41
42    MPI_Barrier(xiosComm_)  ;   
43  }
44
45  CServicesManager::~CServicesManager()
46  {
47    delete winNotify_ ;
48    delete winServices_ ;
49  }
50
51  bool CServicesManager::createServices(const std::string& poolId, const std::string& serviceId, 
52                                        int type, int size, int nbPartitions, bool wait) 
53  {
54
55    int leader ;
56    int poolSize ;
57   
58    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
59    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
60    if (wait)
61    {
62      while (!ok) 
63      {
64        CXios::getDaemonsManager()->eventLoop() ;
65        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
66      }
67    }
68
69    if (ok) 
70    {
71      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ;
72      createServicesNotify(leader, serviceId, type, size, nbPartitions) ;
73      return true ;
74    }
75    else return false ;
76  }
77
78  bool CServicesManager::createServicesOnto(const std::string& poolId, const std::string& serviceId, const std::string& OnServiceId, bool wait)
79  {
80
81    int leader ;
82    int poolSize ;
83   
84    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
85    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
86    if (wait)
87    {
88      while (!ok) 
89      {
90        CXios::getDaemonsManager()->eventLoop() ;
91        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
92      }
93    }
94
95    if (ok) 
96    {
97      info(40)<<"CServicesManager : create service on other, notification to leader "<<leader<<", serviceId : "<<serviceId<<", service onto : "<<OnServiceId<<endl ;
98      createServicesOntoNotify(leader, serviceId, OnServiceId) ;
99      return true ;
100    }
101    else return false ;
102  }
103
104  void CServicesManager::createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions)
105  {
106    notifyType_=NOTIFY_CREATE_SERVICE ;
107    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions ) ;
108    sendNotification(rank) ;
109  }
110
111
112  void CServicesManager::createServicesOntoNotify(int rank, const string& serviceId, const string& OnServiceId)
113  {
114    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
115    notifyCreateServiceOnto_=make_tuple(serviceId, OnServiceId) ;
116    sendNotification(rank) ;
117  }
118
119  void CServicesManager::sendNotification(int rank)
120  {
121    winNotify_->lockWindowExclusive(rank) ;
122    winNotify_->pushToLockedWindow(rank, this, &CServicesManager::notificationsDumpOut) ;
123    winNotify_->unlockWindow(rank) ;
124  }
125
126 
127  void CServicesManager::eventLoop(void)
128  {
129    CTimer::get("CServicesManager::eventLoop").resume();
130    int flag ;
131    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
132    double time=MPI_Wtime() ;
133    if (time-lastEventLoop_ > eventLoopLatency_) 
134    {
135      checkNotifications() ;
136      lastEventLoop_=time ;
137    }
138    CTimer::get("CServicesManager::eventLoop").suspend();
139  }
140
141
142
143  void CServicesManager::checkNotifications(void)
144  {
145    int commRank ;
146    MPI_Comm_rank(xiosComm_, &commRank) ;
147    winNotify_->lockWindowExclusive(commRank) ;
148    winNotify_->popFromLockedWindow(commRank, this, &CServicesManager::notificationsDumpIn) ;
149    winNotify_->unlockWindow(commRank) ;
150    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
151    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
152  }
153
154  void CServicesManager::createService(void)
155  {
156    auto& arg=notifyCreateService_ ;
157    CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
158  }
159
160  void CServicesManager::createServiceOnto(void)
161  {
162    auto& arg=notifyCreateService_ ;
163    //CServer::getServersRessource()->getPoolRessource()->createService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
164  }
165
166  void CServicesManager::notificationsDumpOut(CBufferOut& buffer)
167  {
168
169    buffer.realloc(maxBufferSize_) ;
170   
171    if (notifyType_==NOTIFY_CREATE_SERVICE)
172    {
173      auto& arg=notifyCreateService_ ;
174      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
175    }
176    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
177    {
178      auto& arg=notifyCreateServiceOnto_ ;
179      buffer << notifyType_<< get<0>(arg) << get<1>(arg)  ;
180    }
181  }
182
183  void CServicesManager::notificationsDumpIn(CBufferIn& buffer)
184  {
185    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
186    else
187    {
188      buffer>>notifyType_;
189      if (notifyType_==NOTIFY_CREATE_SERVICE)
190      {
191        auto& arg=notifyCreateService_ ;
192        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
193      }
194      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
195      {
196        auto& arg=notifyCreateServiceOnto_ ;
197        buffer >> get<0>(arg) >> get<1>(arg) ;
198      }
199    }
200  } 
201 
202  void CServicesManager::servicesDumpOut(CBufferOut& buffer)
203  {
204   
205    buffer.realloc(maxBufferSize_) ;
206   
207    buffer<<(int)services_.size();
208   
209    for(auto it=services_.begin();it!=services_.end(); ++it)
210    { 
211      auto key = it->first ;
212      auto val = it->second ; 
213      buffer << std::get<0>(key) << std::get<1>(key) << std::get<2>(key) 
214             <<  static_cast<int>(std::get<0>(val)) << std::get<1>(val) << std::get<2>(val) << std::get<3>(val) ;
215    }
216  }
217
218  void CServicesManager::servicesDumpIn(CBufferIn& buffer)
219  {
220    std::string poolId, serviceId ;
221    int partitionId ;
222    int type ;
223    int size; 
224    int nbPartitions ;
225    int leader ;
226
227    services_.clear() ;
228    int nbServices ;
229    buffer>>nbServices ;
230    for(int i=0;i<nbServices;i++) 
231    {
232      buffer>>poolId>>serviceId>>partitionId>>type>>size>>nbPartitions>>leader ;
233      services_[std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
234    }
235  }
236
237  void CServicesManager::registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, 
238                                         int size, int nbPartitions, int leader)
239  {
240   
241    info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ;
242
243    winServices_->lockWindowExclusive(managerGlobalLeader_) ;
244    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
245    winServices_->flushWindow(managerGlobalLeader_) ;
246    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
247    winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ;
248    winServices_->unlockWindow(managerGlobalLeader_) ;
249
250  }
251
252  bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, 
253                                        int& size, int& nbPartitions, int& leader)
254  {
255   
256    winServices_->lockWindowShared(managerGlobalLeader_) ;
257    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
258    winServices_->unlockWindow(managerGlobalLeader_) ;
259
260    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ;
261    if ( it == services_.end()) return false ;
262    else
263    {
264      type= std::get<0>(it->second); 
265      size= std::get<1>(it->second); 
266      nbPartitions = std::get<2>(it->second); 
267      leader = std::get<3>(it->second); 
268      return true ;
269    }
270  }
271
272  bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader)
273  {
274    int type;
275    int size ;
276    int nbPartitions;
277    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
278  }
279
280  bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type)
281  {
282    int size ;
283    int nbPartitions;
284    int leader;
285    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
286  }
287
288  bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions)
289  {
290    int size ;
291    int type;
292    int leader;
293    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
294  }
295
296  bool CServicesManager::hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId)
297  {
298    winServices_->lockWindow(managerGlobalLeader_,0) ;
299    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
300    winServices_->unlockWindow(managerGlobalLeader_,0) ;
301    auto it=services_.find(std::tuple<std::string, std::string, int>(poolId, serviceId, partitionId)) ;
302    if ( it == services_.end()) return false ;
303    else return true ;
304  }
305
306}
Note: See TracBrowser for help on using the repository browser.