source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services_manager.cpp @ 2260

Last change on this file since 2260 was 2260, checked in by ymipsl, 3 years ago

Improvment of one sided protocol

  • removed latency
  • solve dead-lock

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.2 KB
Line 
1#include "daemons_manager.hpp"
2#include "services_manager.hpp"
3#include "ressources_manager.hpp"
4#include "cxios.hpp"
5#include "pool_ressource.hpp"
6#include "type.hpp"
7#include "server.hpp"
8#include "servers_ressource.hpp"
9#include "timer.hpp"
10
11namespace xios
12{
13
14 
15
16  CServicesManager::CServicesManager(bool isXiosServer)
17  {
18   
19    int commRank ; 
20    xiosComm_ = CXios::getXiosComm() ;
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22   
23   
24    // The global manager leader will be the process of rank 0
25    // By "xiosComm" communicator construction
26    // - if servers exits it will be the root process of the servers communicator
27    // - otherwise the root process of the first model
28   
29    managerGlobalLeader_ = 0 ;
30
31    MPI_Comm_rank(xiosComm_, &commRank) ;
32    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
33    winNotify_->lockWindow(commRank,0) ;
34    winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;
35    winNotify_->unlockWindow(commRank,0) ;
36
37    winServices_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
38    winServices_->lockWindow(commRank,0) ;
39    winServices_->updateToWindow(commRank, this, &CServicesManager::servicesDumpOut) ;
40    winServices_->unlockWindow(commRank,0) ;
41
42    MPI_Barrier(xiosComm_)  ;   
43  }
44
45  CServicesManager::~CServicesManager()
46  {
47    delete winNotify_ ;
48    delete winServices_ ;
49  }
50
51  bool CServicesManager::createServices(const std::string& poolId, const std::string& serviceId, 
52                                        int type, int size, int nbPartitions, bool wait) 
53  {
54
55    int leader ;
56    int poolSize ;
57   
58    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ;
59    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
60    if (wait)
61    {
62      while (!ok) 
63      {
64        CXios::getDaemonsManager()->eventLoop() ;
65        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ;
66      }
67    }
68
69    if (ok) 
70    {
71      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ;
72      createServicesNotify(leader, serviceId, type, size, nbPartitions) ;
73      return true ;
74    }
75    else return false ;
76  }
77
78
79  void CServicesManager::createServicesNotify(int rank, const string& serviceId, int type, int size, int nbPartitions)
80  {
81    winNotify_->lockWindow(rank,0) ;
82    winNotify_->updateFromWindow(rank, this, &CServicesManager::notificationsDumpIn) ;
83    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions)) ;
84    winNotify_->updateToWindow(rank, this, &CServicesManager::notificationsDumpOut) ;
85    winNotify_->unlockWindow(rank,0) ;
86  }
87
88 
89  void CServicesManager::checkCreateServicesNotification(void)
90  {
91    int commRank ;
92    MPI_Comm_rank(xiosComm_,&commRank) ;
93    winNotify_->lockWindow(commRank,0) ;
94    winNotify_->updateFromWindow(commRank, this, &CServicesManager::notificationsDumpIn) ;
95   
96    if (!notifications_.empty())
97    {
98      auto info = notifications_.front() ;
99      xios::info(40)<<"CServicesManager : receive create service notification : "<<get<0>(info)<<endl ;
100      CServer::getServersRessource()->getPoolRessource()->createService(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
101      notifications_.pop_front() ;
102      winNotify_->updateToWindow(commRank, this, &CServicesManager::notificationsDumpOut) ;     
103    }
104    winNotify_->unlockWindow(commRank,0) ;
105
106  }
107
108  void CServicesManager::eventLoop(void)
109  {
110    CTimer::get("CServicesManager::eventLoop").resume();
111    int flag ;
112    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
113    double time=MPI_Wtime() ;
114    if (time-lastEventLoop_ > eventLoopLatency_) 
115    {
116      checkCreateServicesNotification() ;
117      lastEventLoop_=time ;
118    }
119    CTimer::get("CServicesManager::eventLoop").suspend();
120  }
121
122 
123  void CServicesManager::notificationsDumpOut(CBufferOut& buffer)
124  {
125   
126    buffer.realloc(maxBufferSize_) ;
127   
128    buffer<<(int)notifications_.size();
129   
130    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
131      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it) << std::get<3>(*it)  ;
132  }
133
134  void CServicesManager::notificationsDumpIn(CBufferIn& buffer)
135  {
136    std::string id ;
137    int type ;
138    int size; 
139    int nbPartitions ;
140
141    notifications_.clear() ;
142    int nbNotifications ;
143    buffer>>nbNotifications ;
144    for(int i=0;i<nbNotifications;i++) 
145    {
146      buffer>>id>>type>>size>>nbPartitions ;
147      notifications_.push_back(std::make_tuple(id,type,size,nbPartitions)) ;
148    }
149  }
150
151 
152  void CServicesManager::servicesDumpOut(CBufferOut& buffer)
153  {
154   
155    buffer.realloc(maxBufferSize_) ;
156   
157    buffer<<(int)services_.size();
158   
159    for(auto it=services_.begin();it!=services_.end(); ++it)
160    { 
161      auto key = it->first ;
162      auto val = it->second ; 
163      buffer << std::get<0>(key) << std::get<1>(key) << std::get<2>(key) 
164             <<  static_cast<int>(std::get<0>(val)) << std::get<1>(val) << std::get<2>(val) << std::get<3>(val) ;
165    }
166  }
167
168  void CServicesManager::servicesDumpIn(CBufferIn& buffer)
169  {
170    std::string poolId, serviceId ;
171    int partitionId ;
172    int type ;
173    int size; 
174    int nbPartitions ;
175    int leader ;
176
177    services_.clear() ;
178    int nbServices ;
179    buffer>>nbServices ;
180    for(int i=0;i<nbServices;i++) 
181    {
182      buffer>>poolId>>serviceId>>partitionId>>type>>size>>nbPartitions>>leader ;
183      services_[std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
184    }
185  }
186
187  void CServicesManager::registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, 
188                                         int size, int nbPartitions, int leader)
189  {
190   
191    info(40)<<"CServicesManager : registering service, poolId : "<<poolId<<", serviceId : "<<serviceId<<endl ; ;
192
193    winServices_->lockWindowExclusive(managerGlobalLeader_) ;
194    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
195    winServices_->flushWindow(managerGlobalLeader_) ;
196    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
197    winServices_->updateToLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ;
198    winServices_->unlockWindow(managerGlobalLeader_) ;
199
200/*
201    winServices_->lockWindow(managerGlobalLeader_,0) ;
202    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
203    services_[std::tuple<std::string, std::string,int>(poolId,serviceId,partitionId)]=std::make_tuple(type,size,nbPartitions,leader) ;
204    winServices_->updateToWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpOut) ;
205    winServices_->unlockWindow(managerGlobalLeader_,0) ;*/
206  }
207
208  bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, 
209                                        int& size, int& nbPartitions, int& leader)
210  {
211   
212    winServices_->lockWindowShared(managerGlobalLeader_) ;
213    winServices_->updateFromLockedWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
214    winServices_->unlockWindow(managerGlobalLeader_) ;
215/*
216    winServices_->lockWindow(managerGlobalLeader_,0) ;
217    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
218    winServices_->unlockWindow(managerGlobalLeader_,0) ;*/
219
220    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ;
221    if ( it == services_.end()) return false ;
222    else
223    {
224      type= std::get<0>(it->second); 
225      size= std::get<1>(it->second); 
226      nbPartitions = std::get<2>(it->second); 
227      leader = std::get<3>(it->second); 
228      return true ;
229    }
230  }
231
232  bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader)
233  {
234    int type;
235    int size ;
236    int nbPartitions;
237    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
238  }
239
240  bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type)
241  {
242    int size ;
243    int nbPartitions;
244    int leader;
245    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
246  }
247
248  bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions)
249  {
250    int size ;
251    int type;
252    int leader;
253    return getServiceInfo(poolId, serviceId, partitionId, type, size, nbPartitions, leader) ;
254  }
255
256  bool CServicesManager::hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId)
257  {
258    winServices_->lockWindow(managerGlobalLeader_,0) ;
259    winServices_->updateFromWindow(managerGlobalLeader_, this, &CServicesManager::servicesDumpIn) ;
260    winServices_->unlockWindow(managerGlobalLeader_,0) ;
261    auto it=services_.find(std::tuple<std::string, std::string, int>(poolId, serviceId, partitionId)) ;
262    if ( it == services_.end()) return false ;
263    else return true ;
264  }
265
266}
Note: See TracBrowser for help on using the repository browser.