source: XIOS3/trunk/src/manager/ressources_manager.cpp @ 2458

Last change on this file since 2458 was 2458, checked in by ymipsl, 17 months ago

Merge XIOS_FILE_SERVICE dev branch into trunk

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.3 KB
Line 
1#include "ressources_manager.hpp"
2#include "server.hpp"
3#include "servers_ressource.hpp"
4#include "token_manager.hpp"
5#include "timer.hpp"
6
7
8
9
10
11namespace xios
12{
13  using namespace std;
14
15  CRessourcesManager::CRessourcesManager(bool isXiosServer) 
16  {
17   
18    xiosComm_ = CXios::getXiosComm()  ;
19   
20    int commRank ; 
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
23    else commRank=0 ;
24    tokenManager_ = new CTokenManager(xiosComm_,commRank) ;
25
26    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
27
28    MPI_Comm_rank(xiosComm_, &commRank) ;
29    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
30   
31
32    winRessources_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
33    winRessources_->lockWindow(commRank,0) ;
34    serverLeader_=-1 ;
35    winRessources_->updateToWindow(commRank, this, &CRessourcesManager::ressourcesDumpOut) ;
36    winRessources_->unlockWindow(commRank,0) ;
37
38    MPI_Barrier(xiosComm_)  ;   
39  }
40 
41  CRessourcesManager::~CRessourcesManager()
42  {
43    delete winNotify_ ;
44    delete winRessources_ ;
45  } 
46
47  void CRessourcesManager::createPool(const string& poolId, int size)
48  {
49    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ;
50    info(40)<<"send notification to leader : "<<serverLeader_<<endl ;
51    winRessources_->lockWindow(managerGlobalLeader_,0) ;
52    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
53    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
54   
55    notifyType_=NOTIFY_CREATE_POOL ;
56    notifyCreatePool_=make_tuple(poolId, size) ;
57    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ;
58    sendNotification(serverLeader_) ; 
59  }
60 
61  void CRessourcesManager::finalize(void)
62  {
63    winRessources_->lockWindow(managerGlobalLeader_,0) ;
64    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
65    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
66   
67    if (serverLeader_!=-1)
68    {
69      notifyType_=NOTIFY_FINALIZE ;
70      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ;
71      sendNotification(serverLeader_) ;
72    } 
73  }
74
75  void CRessourcesManager::sendNotification(int rank)
76  {
77    winNotify_->lockWindowExclusive(rank) ;
78    winNotify_->pushToLockedWindow(rank, this, &CRessourcesManager::notificationsDumpOut) ;
79    winNotify_->unlockWindow(rank) ;
80  }
81
82 
83  void CRessourcesManager::notificationsDumpOut(CBufferOut& buffer)
84  {
85   
86    buffer.realloc(maxBufferSize_) ;
87   
88    if (notifyType_==NOTIFY_CREATE_POOL)
89    {
90      auto& arg=notifyCreatePool_ ;
91      buffer << notifyType_<< get<0>(arg) << get<1>(arg) ;
92    }
93    else if (notifyType_==NOTIFY_FINALIZE)
94    {
95      buffer << notifyType_ ;
96    }
97  }
98
99  void CRessourcesManager::notificationsDumpIn(CBufferIn& buffer)
100  {
101    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
102    else
103    {
104      buffer>>notifyType_;
105      if (notifyType_==NOTIFY_CREATE_POOL)
106      {
107        auto& arg=notifyCreatePool_ ;
108        buffer >> get<0>(arg) >> get<1>(arg)  ;
109      }
110      else if (notifyType_==NOTIFY_FINALIZE) { /*nothing to do*/ }
111    }
112
113  }
114
115  void CRessourcesManager::eventLoop(void)
116  {
117    CTimer::get("CRessourcesManager::eventLoop").resume();
118    int flag ;
119    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
120    double time=MPI_Wtime() ;
121    if (time-lastEventLoop_ > eventLoopLatency_) 
122    {
123      checkNotifications() ;
124      lastEventLoop_=time ;
125    }
126
127    CTimer::get("CRessourcesManager::eventLoop").suspend();
128  }
129 
130  void CRessourcesManager::checkNotifications(void)
131  {
132    int commRank ;
133    MPI_Comm_rank(xiosComm_, &commRank) ;
134    CTimer::get("CRessourcesManager::checkNotifications lock").resume();
135    winNotify_->lockWindowExclusive(commRank) ;
136    CTimer::get("CRessourcesManager::checkNotifications lock").suspend();
137    CTimer::get("CRessourcesManager::checkNotifications pop").resume();
138    winNotify_->popFromLockedWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ;
139    CTimer::get("CRessourcesManager::checkNotifications pop").suspend();
140    CTimer::get("CRessourcesManager::checkNotifications unlock").resume();
141    winNotify_->unlockWindow(commRank) ;
142    CTimer::get("CRessourcesManager::checkNotifications unlock").suspend();
143    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ;
144    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ;
145  }
146
147  void CRessourcesManager::createPool(void)
148  {
149   
150    auto& arg=notifyCreatePool_ ;
151    string poolId=get<0>(arg) ;
152    int size=get<1>(arg) ;
153    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ;
154    CServer::getServersRessource()->createPool(poolId,size) ;
155  } 
156
157  void CRessourcesManager::finalizeSignal(void)
158  {
159    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ;
160    CServer::getServersRessource()->finalize() ;
161  }
162
163  void CRessourcesManager::ressourcesDumpOut(CBufferOut& buffer)
164  {
165   
166    buffer.realloc(maxBufferSize_) ;
167   
168    buffer<<ressourcesSize_<<freeRessourcesSize_<<serverLeader_ ; 
169    buffer<<(int) pools_.size();
170    for(auto it=pools_.begin();it!=pools_.end(); ++it)
171    { 
172      auto key = it->first ;
173      auto val = it->second ; 
174      buffer << key<<std::get<0>(val)  << std::get<1>(val)  << std::get<2>(val);
175    }
176  }
177
178  void CRessourcesManager::ressourcesDumpIn(CBufferIn& buffer)
179  {
180    std::string poolId ;
181    int size ;
182    int freeSize ;
183    int leader ;
184   
185    buffer>>ressourcesSize_>>freeRessourcesSize_>>serverLeader_ ;
186    pools_.clear() ;
187    int nbPools ;
188    buffer>>nbPools ;
189    for(int i=0;i<nbPools;i++) 
190    {
191      buffer>>poolId>>size>>freeSize>>leader ;
192      pools_[poolId]=std::make_tuple(size, freeSize, leader) ;
193    }
194  }
195 
196  void CRessourcesManager::registerServerLeader(int serverLeaderRank)
197  {
198    winRessources_->lockWindow(managerGlobalLeader_,0) ;
199    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
200    serverLeader_ = serverLeaderRank ;
201    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
202    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
203  }
204 
205  void CRessourcesManager::registerRessourcesSize(int size)
206  {
207    winRessources_->lockWindow(managerGlobalLeader_,0) ;
208    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
209    ressourcesSize_ = size ;
210    freeRessourcesSize_ = size ;
211    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
212    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
213  }
214
215 
216  void CRessourcesManager::registerPoolClient(const string& poolId, int size, int leader)
217  {
218    winRessources_->lockWindow(managerGlobalLeader_,0) ;
219    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
220    pools_[poolId] = make_tuple(size, size, leader) ;
221    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
222    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
223  }
224
225  void CRessourcesManager::registerPoolServer(const string& poolId, int size, int leader)
226  {
227    winRessources_->lockWindow(managerGlobalLeader_,0) ;
228    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
229    pools_[poolId] = make_tuple(size, size, leader) ;
230    freeRessourcesSize_-=size ;
231    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
232    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
233  }
234
235  bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader)
236  {
237    winRessources_->lockWindow(managerGlobalLeader_,0) ;
238    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
239    winRessources_->unlockWindow(managerGlobalLeader_,0) ;
240
241    auto it=pools_.find(poolId) ;
242    if ( it == pools_.end()) return false ;
243    else
244    {
245      size=get<0>(it->second) ;
246      freeSize=get<1>(it->second) ;
247      leader=get<2>(it->second) ;
248      return true ;
249    }
250  }
251
252  bool CRessourcesManager::decreasePoolFreeSize(const string& poolId, int size)
253  {
254    bool ret ;
255
256    winRessources_->lockWindow(managerGlobalLeader_,0) ;
257    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
258   
259
260    auto it=pools_.find(poolId) ;
261   
262    if ( it == pools_.end()) ret=false ;
263    else 
264    {
265      get<1>(it->second)-=size ;
266      ret=true ;
267    }
268    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
269    winRessources_->unlockWindow(managerGlobalLeader_,0) ; 
270
271    return ret ;   
272  }
273
274  int CRessourcesManager::getRessourcesSize(void)
275  {
276    winRessources_->lockWindow(managerGlobalLeader_,0) ;
277    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
278    winRessources_->unlockWindow(managerGlobalLeader_,0) ;
279
280    return ressourcesSize_ ;
281  }
282
283  int CRessourcesManager::getFreeRessourcesSize(void)
284  {
285    winRessources_->lockWindow(managerGlobalLeader_,0) ;
286    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
287    winRessources_->unlockWindow(managerGlobalLeader_,0) ;
288
289    return freeRessourcesSize_ ;
290  } 
291
292  bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader)
293  {
294    int size, freeSize ;
295    return getPoolInfo(poolId, size, freeSize, leader) ;
296  }
297
298  bool CRessourcesManager::getPoolSize(const string& poolId, int& size)
299  {
300    int leader,freeSize ;
301    return getPoolInfo(poolId, size, freeSize, leader) ;
302  }
303
304  bool CRessourcesManager::getPoolFreeSize(const string& poolId, int& freeSize)
305  {
306    int leader,size ;
307    return getPoolInfo(poolId, size, freeSize, leader) ;
308  }
309
310  bool CRessourcesManager::hasPool(const string& poolId)
311  {
312    int leader,size,freeSize ;
313    return getPoolInfo(poolId, size, freeSize, leader) ;
314  }
315
316  void CRessourcesManager::waitPoolRegistration(const string& poolId)
317  {
318    while(!hasPool(poolId)) CXios::getDaemonsManager()->servicesEventLoop() ;
319  }
320}
Note: See TracBrowser for help on using the repository browser.