source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/ressources_manager.cpp @ 2246

Last change on this file since 2246 was 2246, checked in by ymipsl, 3 years ago
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.5 KB
Line 
1#include "ressources_manager.hpp"
2#include "server.hpp"
3#include "servers_ressource.hpp"
4#include "timer.hpp"
5
6
7
8
9
10namespace xios
11{
12  using namespace std;
13
14  CRessourcesManager::CRessourcesManager(bool isXiosServer) 
15  {
16   
17    xiosComm_ = CXios::getXiosComm()  ;
18   
19    int commRank ; 
20    MPI_Comm_rank(xiosComm_, &commRank) ;
21    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
22    else commRank=0 ;
23    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
24
25    MPI_Comm_rank(xiosComm_, &commRank) ;
26    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
27   
28
29    winRessources_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
30    winRessources_->lockWindow(commRank,0) ;
31    serverLeader_=-1 ;
32    winRessources_->updateToWindow(commRank, this, &CRessourcesManager::ressourcesDumpOut) ;
33    winRessources_->unlockWindow(commRank,0) ;
34
35    MPI_Barrier(xiosComm_)  ;   
36  }
37 
38  CRessourcesManager::~CRessourcesManager()
39  {
40    delete winNotify_ ;
41    delete winRessources_ ;
42  } 
43
44  void CRessourcesManager::createPool(const string& poolId, int size)
45  {
46    info(40)<<"CRessourcesManager::createPool : calling createPool : "<<poolId<<"  of size"<<size<<endl ;
47    info(40)<<"send notification to leader : "<<serverLeader_<<endl ;
48    winRessources_->lockWindow(managerGlobalLeader_,0) ;
49    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
50    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
51   
52    notifyType_=NOTIFY_CREATE_POOL ;
53    notifyCreatePool_=make_tuple(poolId, size) ;
54    info(40)<<"CRessourcesManager::createPool : send notification creating pool to server leader "<<serverLeader_<<endl ;
55    sendNotification(serverLeader_) ; 
56  }
57 
58  void CRessourcesManager::finalize(void)
59  {
60    winRessources_->lockWindow(managerGlobalLeader_,0) ;
61    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
62    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
63   
64    if (serverLeader_!=-1)
65    {
66      notifyType_=NOTIFY_FINALIZE ;
67      info(40)<<"CRessourcesManager::finalize : send notification finalize to server leader "<<serverLeader_<<endl ;
68      sendNotification(serverLeader_) ;
69    } 
70  }
71
72  void CRessourcesManager::sendNotification(int rank)
73  {
74    winNotify_->lockWindow(rank,0) ;
75    winNotify_->pushToWindow(rank, this, &CRessourcesManager::notificationsDumpOut) ;
76    winNotify_->unlockWindow(rank,0) ;
77  }
78
79 
80  void CRessourcesManager::notificationsDumpOut(CBufferOut& buffer)
81  {
82   
83    buffer.realloc(maxBufferSize_) ;
84   
85    if (notifyType_==NOTIFY_CREATE_POOL)
86    {
87      auto& arg=notifyCreatePool_ ;
88      buffer << notifyType_<< get<0>(arg) << get<1>(arg) ;
89    }
90    else if (notifyType_==NOTIFY_FINALIZE)
91    {
92      buffer << notifyType_ ;
93    }
94  }
95
96  void CRessourcesManager::notificationsDumpIn(CBufferIn& buffer)
97  {
98    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
99    else
100    {
101      buffer>>notifyType_;
102      if (notifyType_==NOTIFY_CREATE_POOL)
103      {
104        auto& arg=notifyCreatePool_ ;
105        buffer >> get<0>(arg) >> get<1>(arg)  ;
106      }
107      else if (notifyType_==NOTIFY_FINALIZE) { /*nothing to do*/ }
108    }
109
110  }
111
112  void CRessourcesManager::eventLoop(void)
113  {
114    CTimer::get("CRessourcesManager::eventLoop").resume();
115    double time=MPI_Wtime() ;
116    if (time-lastEventLoop_ > eventLoopLatency_) 
117    {
118      checkNotifications() ;
119      lastEventLoop_=time ;
120    }
121
122    CTimer::get("CRessourcesManager::eventLoop").suspend();
123  }
124 
125  void CRessourcesManager::checkNotifications(void)
126  {
127    int commRank ;
128    MPI_Comm_rank(xiosComm_, &commRank) ;
129    CTimer::get("CRessourcesManager::checkNotifications lock").resume();
130    winNotify_->lockWindow(commRank,0) ;
131    CTimer::get("CRessourcesManager::checkNotifications lock").suspend();
132    CTimer::get("CRessourcesManager::checkNotifications pop").resume();
133    winNotify_->popFromWindow(commRank, this, &CRessourcesManager::notificationsDumpIn) ;
134    CTimer::get("CRessourcesManager::checkNotifications pop").suspend();
135    CTimer::get("CRessourcesManager::checkNotifications unlock").resume();
136    winNotify_->unlockWindow(commRank,0) ;
137    CTimer::get("CRessourcesManager::checkNotifications unlock").suspend();
138    if (notifyType_==NOTIFY_CREATE_POOL) createPool() ;
139    else if (notifyType_==NOTIFY_FINALIZE) finalizeSignal() ;
140  }
141
142  void CRessourcesManager::createPool(void)
143  {
144   
145    auto& arg=notifyCreatePool_ ;
146    string poolId=get<0>(arg) ;
147    int size=get<1>(arg) ;
148    info(40)<<"CRessourcesManager::createPool : receive create pool notification : "<< poolId<<"  of size "<<size<<endl ;
149    CServer::getServersRessource()->createPool(poolId,size) ;
150  } 
151
152  void CRessourcesManager::finalizeSignal(void)
153  {
154    info(40)<<"CRessourcesManager::createPool : receive finalize notification"<<endl ;
155    CServer::getServersRessource()->finalize() ;
156  }
157
158  void CRessourcesManager::ressourcesDumpOut(CBufferOut& buffer)
159  {
160   
161    buffer.realloc(maxBufferSize_) ;
162   
163    buffer<<serverLeader_ ; 
164    buffer<<(int) pools_.size();
165    for(auto it=pools_.begin();it!=pools_.end(); ++it)
166    { 
167      auto key = it->first ;
168      auto val = it->second ; 
169      buffer << key<<std::get<0>(val) << std::get<1>(val)  ;
170    }
171  }
172
173  void CRessourcesManager::ressourcesDumpIn(CBufferIn& buffer)
174  {
175    std::string poolId ;
176    int size ;
177    int leader ;
178   
179    buffer>>serverLeader_ ;
180    pools_.clear() ;
181    int nbPools ;
182    buffer>>nbPools ;
183    for(int i=0;i<nbPools;i++) 
184    {
185      buffer>>poolId>>size>>leader ;
186      pools_[poolId]=std::make_tuple(size,leader) ;
187    }
188  }
189 
190  void CRessourcesManager::registerServerLeader(int serverLeaderRank)
191  {
192    winRessources_->lockWindow(managerGlobalLeader_,0) ;
193    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
194    serverLeader_ = serverLeaderRank ;
195    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
196    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
197  }
198 
199  void CRessourcesManager::registerRessourcesSize(int size)
200  {
201    winRessources_->lockWindow(managerGlobalLeader_,0) ;
202    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
203    ressourcesSize_ = size ;
204    freeRessourcesSize_ = size ;
205    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
206    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
207  }
208
209 
210  void CRessourcesManager::registerPool(const string& poolId, int size, int leader)
211  {
212    winRessources_->lockWindow(managerGlobalLeader_,0) ;
213    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
214    pools_[poolId] = make_tuple(size,leader) ;
215    freeRessourcesSize_-=size ;
216    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ;
217    winRessources_->unlockWindow(managerGlobalLeader_,0) ;   
218  }
219
220
221  bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& leader)
222  {
223    winRessources_->lockWindow(managerGlobalLeader_,0) ;
224    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
225    winRessources_->unlockWindow(managerGlobalLeader_,0) ;
226
227    auto it=pools_.find(poolId) ;
228    if ( it == pools_.end()) return false ;
229    else
230    {
231      size=get<0>(it->second) ;
232      leader=get<1>(it->second) ;
233      return true ;
234    }
235  }
236
237  int CRessourcesManager::getRessourcesSize(void)
238  {
239    winRessources_->lockWindow(managerGlobalLeader_,0) ;
240    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
241    winRessources_->unlockWindow(managerGlobalLeader_,0) ;
242
243    return ressourcesSize_ ;
244  }
245
246  int CRessourcesManager::getFreeRessourcesSize(void)
247  {
248    winRessources_->lockWindow(managerGlobalLeader_,0) ;
249    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ;
250    winRessources_->unlockWindow(managerGlobalLeader_,0) ;
251
252    return freeRessourcesSize_ ;
253  } 
254
255  bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader)
256  {
257    int size ;
258    return getPoolInfo(poolId, size, leader) ;
259  }
260
261  bool CRessourcesManager::getPoolSize(const string& poolId, int& size)
262  {
263    int leader ;
264    return getPoolInfo(poolId, size, leader) ;
265  }
266
267  bool CRessourcesManager::hasPool(const string& poolId)
268  {
269    int leader,size ;
270    return getPoolInfo(poolId, size, leader) ;
271  }
272}
Note: See TracBrowser for help on using the repository browser.