source: XIOS3/trunk/src/manager/contexts_manager.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 6 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.7 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include "timer.hpp"
10#include <functional>
11
12
13namespace xios
14{
15  using namespace std ;
16  extern CLogType logTimers ;
17
18  CContextsManager::CContextsManager(bool isXiosServer)
19  {
20    xiosComm_ = CXios::getXiosComm()  ;
21   
22    int commRank ; 
23    MPI_Comm_rank(xiosComm_, &commRank) ;
24    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
25    else commRank=0 ;
26    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
27
28    MPI_Comm_rank(xiosComm_, &commRank) ;
29    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_,"CContextsManager::winNotify_") ;
30    winNotify_->updateToExclusiveWindow(commRank, this, &CContextsManager::notificationsDumpOut) ;
31   
32
33    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_,"CContextsManager::winContexts_") ;
34    winContexts_->updateToExclusiveWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
35 
36    MPI_Barrier(xiosComm_)  ;   
37  }
38
39
40  CContextsManager::~CContextsManager()
41  {
42    delete winNotify_ ;
43    delete winContexts_ ;
44  }
45
46  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
47                                             const string& contextId, bool wait)
48  {
49    int serviceLeader ;
50    auto servicesManager = CXios::getServicesManager() ;
51   
52    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
53
54    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ;
55    if (wait)
56    {
57      while (!ok) 
58      {
59        CXios::getDaemonsManager()->servicesEventLoop() ;
60        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
61      }
62    }
63
64    if (ok) 
65    {
66      notifyType_=NOTIFY_CREATE_CONTEXT ;
67      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
68      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ;
69      sendNotification(serviceLeader) ;
70      return true ;
71    }
72    else return false ;
73  }
74
75
76  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
77                                                      const string& contextId, const string& sourceContext, bool wait)
78  {
79    int contextLeader ;
80    bool ok ;
81    int remoteLeader ;
82    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
83   
84    int type ;
85    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ;
86    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
87    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
88    if (wait)
89    {
90      while (!ok) 
91      {
92        CXios::getDaemonsManager()->servicesEventLoop() ;
93        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
94        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
95      }
96    }
97   
98    if (ok) 
99    {
100      notifyType_=NOTIFY_CREATE_INTERCOMM ;
101      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
102      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ;
103      sendNotification(contextLeader) ;
104      return true ;
105    }
106    else return false ;
107  }
108
109  void CContextsManager::sendNotification(int rank)
110  {
111    winNotify_->lockWindowExclusive(rank) ;
112    winNotify_->pushToLockedWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
113    winNotify_->unlockWindowExclusive(rank) ;
114  }
115
116 
117  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
118  {
119   
120    buffer.realloc(maxBufferSize_) ;
121   
122    if (notifyType_==NOTIFY_CREATE_CONTEXT)
123    {
124      auto& arg=notifyCreateContext_ ;
125      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
126    }
127    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
128    {
129      auto& arg=notifyCreateIntercomm_ ;
130      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
131    }
132  }
133
134  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
135  {
136    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
137    else
138    {
139      buffer>>notifyType_;
140      if (notifyType_==NOTIFY_CREATE_CONTEXT)
141      {
142        auto& arg=notifyCreateContext_ ;
143        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
144      }
145      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
146      {
147        auto& arg=notifyCreateIntercomm_ ;
148        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
149      }
150    }
151
152  }
153
154  void CContextsManager::eventLoop(void)
155  {
156    if (info.isActive(logTimers)) CTimer::get("CContextsManager::eventLoop").resume();
157    int flag ;
158    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
159    double time=MPI_Wtime() ;
160    if (time-lastEventLoop_ > eventLoopLatency_) 
161    {
162      checkNotifications() ;
163      lastEventLoop_=time ;
164    }
165    if (info.isActive(logTimers)) CTimer::get("CContextsManager::eventLoop").suspend();
166  }
167 
168  void CContextsManager::checkNotifications(void)
169  {
170    int commRank ;
171    MPI_Comm_rank(xiosComm_, &commRank) ;
172    winNotify_->popFromExclusiveWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
173    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
174    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
175
176  }
177
178  void CContextsManager::createServerContext(void)
179  {
180    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ;
181    auto arg=notifyCreateContext_ ;
182    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
183                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
184 
185  }
186
187  void CContextsManager::createServerContextIntercomm(void)
188  {
189    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ;
190    auto arg=notifyCreateIntercomm_ ;
191    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
192                             ->getServerContext(get<3>(arg))
193                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
194  }             
195
196  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
197                                                const int& type, const string& contextId)
198  {
199    if (type==CServicesManager::CLIENT) return contextId;
200    else
201    {
202      ostringstream oss;
203      oss<<partitionId;
204      return poolId+"__"+serviceId+"_"+oss.str()+"__"+contextId;
205    }
206  }
207
208  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
209  {
210    winContexts_->lockWindowExclusive(managerGlobalLeader_) ;
211    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
212    winContexts_->flushWindow(managerGlobalLeader_) ;
213    contexts_[fullContextId] = contextInfo ;
214    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
215    winContexts_->unlockWindowExclusive(managerGlobalLeader_) ;
216  }
217
218  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
219  {
220    bool ret ;
221    int commRank=0 ;
222    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
223
224    if (commRank==0)
225    {
226
227      winContexts_->updateFromSharedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
228
229      auto it=contexts_.find(fullContextId) ;
230      if ( it == contexts_.end()) ret=false ;
231      else
232      {
233        contextInfo=it->second ; 
234        ret=true ;
235      }
236    }
237   
238    if (comm!=MPI_COMM_NULL) 
239    {
240      int cast_ret = 0;
241      if (commRank==0) cast_ret = ret;
242      MPI_Bcast(&cast_ret,1,MPI_INT,0,comm) ;
243      ret = cast_ret;
244      if (ret)
245      {
246        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
247        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
248        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
249        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
250        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
251        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
252        MPI_Bcast_string(contextInfo.id,0,comm) ;
253      }
254    }
255    return ret ;
256  }
257
258  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
259  {
260    SRegisterContextInfo contextInfo ;
261    bool ret=getContextInfo(fullContextId, contextInfo) ;
262    if (ret) leader=contextInfo.leader ;
263    return ret ;
264  }
265
266  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
267  {
268
269    SRegisterContextInfo contextInfo ;
270    bool ret=getContextInfo(fullContextId, contextInfo) ;
271    if (ret) size=contextInfo.size ;
272    return ret ;
273  }
274
275  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
276  {
277    SRegisterContextInfo contextInfo ;
278    bool ret=getContextInfo(fullContextId, contextInfo) ;
279    if (ret) poolId=contextInfo.poolId ;
280    return ret ;
281  }
282
283  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
284  {
285    SRegisterContextInfo contextInfo ;
286    bool ret=getContextInfo(fullContextId, contextInfo) ;
287    if (ret) serviceId=contextInfo.serviceId ;
288    return ret ;
289  }
290
291  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
292  {
293    SRegisterContextInfo contextInfo ;
294    bool ret=getContextInfo(fullContextId, contextInfo) ;
295    if (ret) partitionId=contextInfo.partitionId ;
296    return ret ;
297  }
298 
299  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
300  {
301    SRegisterContextInfo contextInfo ;
302    bool ret=getContextInfo(fullContextId, contextInfo) ;
303    if (ret) serviceType=contextInfo.serviceType ;
304    return ret ;
305  }
306
307  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
308  {
309    SRegisterContextInfo contextInfo ;
310    bool ret=getContextInfo(fullContextId, contextInfo) ;
311    if (ret) contextId=contextInfo.id ;
312    return ret ;
313  }
314
315
316  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
317  {
318    SRegisterContextInfo contextInfo ;
319    return getContextInfo(fullContextId, contextInfo) ;
320  }
321
322  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
323  {
324    buffer.realloc(maxBufferSize_) ;
325    buffer<<(int)contexts_.size();
326   
327    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
328    { 
329      auto key = it->first ;
330      auto val = it->second ; 
331      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
332    }
333  } 
334
335  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
336  {
337    std::string contextId ;
338    SRegisterContextInfo ci;
339    int size; 
340    int leader ;
341
342    contexts_.clear() ;
343    int nbContexts ;
344    buffer>>nbContexts ;
345    for(int i=0;i<nbContexts;i++) 
346    {
347      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
348      contexts_[contextId]=ci ;
349    }
350
351  }
352}
Note: See TracBrowser for help on using the repository browser.