source: XIOS3/trunk/src/manager/contexts_manager.cpp @ 2613

Last change on this file since 2613 was 2613, checked in by jderouillat, 2 months ago

Fix the attached mode for scalar output, and some bugs revealed by the adastra porting in debug mode

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.6 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include "timer.hpp"
10#include <functional>
11
12
13namespace xios
14{
15  using namespace std ;
16
17  CContextsManager::CContextsManager(bool isXiosServer)
18  {
19    xiosComm_ = CXios::getXiosComm()  ;
20   
21    int commRank ; 
22    MPI_Comm_rank(xiosComm_, &commRank) ;
23    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
24    else commRank=0 ;
25    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
26
27    MPI_Comm_rank(xiosComm_, &commRank) ;
28    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_,"CContextsManager::winNotify_") ;
29    winNotify_->updateToExclusiveWindow(commRank, this, &CContextsManager::notificationsDumpOut) ;
30   
31
32    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_,"CContextsManager::winContexts_") ;
33    winContexts_->updateToExclusiveWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
34 
35    MPI_Barrier(xiosComm_)  ;   
36  }
37
38
39  CContextsManager::~CContextsManager()
40  {
41    delete winNotify_ ;
42    delete winContexts_ ;
43  }
44
45  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
46                                             const string& contextId, bool wait)
47  {
48    int serviceLeader ;
49    auto servicesManager = CXios::getServicesManager() ;
50   
51    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
52
53    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ;
54    if (wait)
55    {
56      while (!ok) 
57      {
58        CXios::getDaemonsManager()->servicesEventLoop() ;
59        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
60      }
61    }
62
63    if (ok) 
64    {
65      notifyType_=NOTIFY_CREATE_CONTEXT ;
66      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
67      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ;
68      sendNotification(serviceLeader) ;
69      return true ;
70    }
71    else return false ;
72  }
73
74
75  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
76                                                      const string& contextId, const string& sourceContext, bool wait)
77  {
78    int contextLeader ;
79    bool ok ;
80    int remoteLeader ;
81    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
82   
83    int type ;
84    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ;
85    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
86    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
87    if (wait)
88    {
89      while (!ok) 
90      {
91        CXios::getDaemonsManager()->servicesEventLoop() ;
92        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
93        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
94      }
95    }
96   
97    if (ok) 
98    {
99      notifyType_=NOTIFY_CREATE_INTERCOMM ;
100      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
101      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ;
102      sendNotification(contextLeader) ;
103      return true ;
104    }
105    else return false ;
106  }
107
108  void CContextsManager::sendNotification(int rank)
109  {
110    winNotify_->lockWindowExclusive(rank) ;
111    winNotify_->pushToLockedWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
112    winNotify_->unlockWindowExclusive(rank) ;
113  }
114
115 
116  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
117  {
118   
119    buffer.realloc(maxBufferSize_) ;
120   
121    if (notifyType_==NOTIFY_CREATE_CONTEXT)
122    {
123      auto& arg=notifyCreateContext_ ;
124      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
125    }
126    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
127    {
128      auto& arg=notifyCreateIntercomm_ ;
129      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
130    }
131  }
132
133  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
134  {
135    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
136    else
137    {
138      buffer>>notifyType_;
139      if (notifyType_==NOTIFY_CREATE_CONTEXT)
140      {
141        auto& arg=notifyCreateContext_ ;
142        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
143      }
144      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
145      {
146        auto& arg=notifyCreateIntercomm_ ;
147        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
148      }
149    }
150
151  }
152
153  void CContextsManager::eventLoop(void)
154  {
155    CTimer::get("CContextsManager::eventLoop").resume();
156    int flag ;
157    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
158    double time=MPI_Wtime() ;
159    if (time-lastEventLoop_ > eventLoopLatency_) 
160    {
161      checkNotifications() ;
162      lastEventLoop_=time ;
163    }
164    CTimer::get("CContextsManager::eventLoop").suspend();
165  }
166 
167  void CContextsManager::checkNotifications(void)
168  {
169    int commRank ;
170    MPI_Comm_rank(xiosComm_, &commRank) ;
171    winNotify_->popFromExclusiveWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
172    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
173    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
174
175  }
176
177  void CContextsManager::createServerContext(void)
178  {
179    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ;
180    auto arg=notifyCreateContext_ ;
181    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
182                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
183 
184  }
185
186  void CContextsManager::createServerContextIntercomm(void)
187  {
188    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ;
189    auto arg=notifyCreateIntercomm_ ;
190    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
191                             ->getServerContext(get<3>(arg))
192                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
193  }             
194
195  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
196                                                const int& type, const string& contextId)
197  {
198    if (type==CServicesManager::CLIENT) return contextId;
199    else
200    {
201      ostringstream oss;
202      oss<<partitionId;
203      return poolId+"__"+serviceId+"_"+oss.str()+"__"+contextId;
204    }
205  }
206
207  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
208  {
209    winContexts_->lockWindowExclusive(managerGlobalLeader_) ;
210    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
211    winContexts_->flushWindow(managerGlobalLeader_) ;
212    contexts_[fullContextId] = contextInfo ;
213    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
214    winContexts_->unlockWindowExclusive(managerGlobalLeader_) ;
215  }
216
217  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
218  {
219    bool ret ;
220    int commRank=0 ;
221    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
222
223    if (commRank==0)
224    {
225
226      winContexts_->updateFromSharedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
227
228      auto it=contexts_.find(fullContextId) ;
229      if ( it == contexts_.end()) ret=false ;
230      else
231      {
232        contextInfo=it->second ; 
233        ret=true ;
234      }
235    }
236   
237    if (comm!=MPI_COMM_NULL) 
238    {
239      int cast_ret = 0;
240      if (commRank==0) cast_ret = ret;
241      MPI_Bcast(&cast_ret,1,MPI_INT,0,comm) ;
242      ret = cast_ret;
243      if (ret)
244      {
245        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
246        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
247        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
248        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
249        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
250        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
251        MPI_Bcast_string(contextInfo.id,0,comm) ;
252      }
253    }
254    return ret ;
255  }
256
257  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
258  {
259    SRegisterContextInfo contextInfo ;
260    bool ret=getContextInfo(fullContextId, contextInfo) ;
261    if (ret) leader=contextInfo.leader ;
262    return ret ;
263  }
264
265  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
266  {
267
268    SRegisterContextInfo contextInfo ;
269    bool ret=getContextInfo(fullContextId, contextInfo) ;
270    if (ret) size=contextInfo.size ;
271    return ret ;
272  }
273
274  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
275  {
276    SRegisterContextInfo contextInfo ;
277    bool ret=getContextInfo(fullContextId, contextInfo) ;
278    if (ret) poolId=contextInfo.poolId ;
279    return ret ;
280  }
281
282  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
283  {
284    SRegisterContextInfo contextInfo ;
285    bool ret=getContextInfo(fullContextId, contextInfo) ;
286    if (ret) serviceId=contextInfo.serviceId ;
287    return ret ;
288  }
289
290  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
291  {
292    SRegisterContextInfo contextInfo ;
293    bool ret=getContextInfo(fullContextId, contextInfo) ;
294    if (ret) partitionId=contextInfo.partitionId ;
295    return ret ;
296  }
297 
298  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
299  {
300    SRegisterContextInfo contextInfo ;
301    bool ret=getContextInfo(fullContextId, contextInfo) ;
302    if (ret) serviceType=contextInfo.serviceType ;
303    return ret ;
304  }
305
306  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
307  {
308    SRegisterContextInfo contextInfo ;
309    bool ret=getContextInfo(fullContextId, contextInfo) ;
310    if (ret) contextId=contextInfo.id ;
311    return ret ;
312  }
313
314
315  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
316  {
317    SRegisterContextInfo contextInfo ;
318    return getContextInfo(fullContextId, contextInfo) ;
319  }
320
321  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
322  {
323    buffer.realloc(maxBufferSize_) ;
324    buffer<<(int)contexts_.size();
325   
326    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
327    { 
328      auto key = it->first ;
329      auto val = it->second ; 
330      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
331    }
332  } 
333
334  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
335  {
336    std::string contextId ;
337    SRegisterContextInfo ci;
338    int size; 
339    int leader ;
340
341    contexts_.clear() ;
342    int nbContexts ;
343    buffer>>nbContexts ;
344    for(int i=0;i<nbContexts;i++) 
345    {
346      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
347      contexts_[contextId]=ci ;
348    }
349
350  }
351}
Note: See TracBrowser for help on using the repository browser.