source: XIOS3/trunk/src/manager/contexts_manager.cpp @ 2580

Last change on this file since 2580 was 2580, checked in by ymipsl, 8 months ago

Tracking unfree MPI windows and communicators.

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.5 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include "timer.hpp"
10#include <functional>
11
12
13namespace xios
14{
15  using namespace std ;
16
17  CContextsManager::CContextsManager(bool isXiosServer)
18  {
19    xiosComm_ = CXios::getXiosComm()  ;
20   
21    int commRank ; 
22    MPI_Comm_rank(xiosComm_, &commRank) ;
23    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
24    else commRank=0 ;
25    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
26
27    MPI_Comm_rank(xiosComm_, &commRank) ;
28    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_,"CContextsManager::winNotify_") ;
29    winNotify_->updateToExclusiveWindow(commRank, this, &CContextsManager::notificationsDumpOut) ;
30   
31
32    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_,"CContextsManager::winContexts_") ;
33    winContexts_->updateToExclusiveWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
34 
35    MPI_Barrier(xiosComm_)  ;   
36  }
37
38
39  CContextsManager::~CContextsManager()
40  {
41    delete winNotify_ ;
42    delete winContexts_ ;
43  }
44
45  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
46                                             const string& contextId, bool wait)
47  {
48    int serviceLeader ;
49    auto servicesManager = CXios::getServicesManager() ;
50   
51    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
52
53    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ;
54    if (wait)
55    {
56      while (!ok) 
57      {
58        CXios::getDaemonsManager()->servicesEventLoop() ;
59        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
60      }
61    }
62
63    if (ok) 
64    {
65      notifyType_=NOTIFY_CREATE_CONTEXT ;
66      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
67      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ;
68      sendNotification(serviceLeader) ;
69      return true ;
70    }
71    else return false ;
72  }
73
74
75  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
76                                                      const string& contextId, const string& sourceContext, bool wait)
77  {
78    int contextLeader ;
79    bool ok ;
80    int remoteLeader ;
81    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
82   
83    int type ;
84    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ;
85    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
86    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
87    if (wait)
88    {
89      while (!ok) 
90      {
91        CXios::getDaemonsManager()->servicesEventLoop() ;
92        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
93        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
94      }
95    }
96   
97    if (ok) 
98    {
99      notifyType_=NOTIFY_CREATE_INTERCOMM ;
100      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
101      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ;
102      sendNotification(contextLeader) ;
103      return true ;
104    }
105    else return false ;
106  }
107
108  void CContextsManager::sendNotification(int rank)
109  {
110    winNotify_->lockWindowExclusive(rank) ;
111    winNotify_->pushToLockedWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
112    winNotify_->unlockWindowExclusive(rank) ;
113  }
114
115 
116  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
117  {
118   
119    buffer.realloc(maxBufferSize_) ;
120   
121    if (notifyType_==NOTIFY_CREATE_CONTEXT)
122    {
123      auto& arg=notifyCreateContext_ ;
124      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
125    }
126    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
127    {
128      auto& arg=notifyCreateIntercomm_ ;
129      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
130    }
131  }
132
133  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
134  {
135    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
136    else
137    {
138      buffer>>notifyType_;
139      if (notifyType_==NOTIFY_CREATE_CONTEXT)
140      {
141        auto& arg=notifyCreateContext_ ;
142        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
143      }
144      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
145      {
146        auto& arg=notifyCreateIntercomm_ ;
147        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
148      }
149    }
150
151  }
152
153  void CContextsManager::eventLoop(void)
154  {
155    CTimer::get("CContextsManager::eventLoop").resume();
156    int flag ;
157    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
158    double time=MPI_Wtime() ;
159    if (time-lastEventLoop_ > eventLoopLatency_) 
160    {
161      checkNotifications() ;
162      lastEventLoop_=time ;
163    }
164    CTimer::get("CContextsManager::eventLoop").suspend();
165  }
166 
167  void CContextsManager::checkNotifications(void)
168  {
169    int commRank ;
170    MPI_Comm_rank(xiosComm_, &commRank) ;
171    winNotify_->popFromExclusiveWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
172    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
173    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
174
175  }
176
177  void CContextsManager::createServerContext(void)
178  {
179    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ;
180    auto arg=notifyCreateContext_ ;
181    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
182                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
183 
184  }
185
186  void CContextsManager::createServerContextIntercomm(void)
187  {
188    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ;
189    auto arg=notifyCreateIntercomm_ ;
190    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
191                             ->getServerContext(get<3>(arg))
192                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
193  }             
194
195  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
196                                                const int& type, const string& contextId)
197  {
198    if (type==CServicesManager::CLIENT) return contextId;
199    else
200    {
201      ostringstream oss;
202      oss<<partitionId;
203      return poolId+"__"+serviceId+"_"+oss.str()+"__"+contextId;
204    }
205  }
206
207  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
208  {
209    winContexts_->lockWindowExclusive(managerGlobalLeader_) ;
210    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
211    winContexts_->flushWindow(managerGlobalLeader_) ;
212    contexts_[fullContextId] = contextInfo ;
213    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
214    winContexts_->unlockWindowExclusive(managerGlobalLeader_) ;
215  }
216
217  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
218  {
219    bool ret ;
220    int commRank=0 ;
221    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
222
223    if (commRank==0)
224    {
225
226      winContexts_->updateFromSharedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
227
228      auto it=contexts_.find(fullContextId) ;
229      if ( it == contexts_.end()) ret=false ;
230      else
231      {
232        contextInfo=it->second ; 
233        ret=true ;
234      }
235    }
236   
237    if (comm!=MPI_COMM_NULL) 
238    {
239      MPI_Bcast(&ret,1,MPI_INT,0,comm) ;
240      if (ret)
241      {
242        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
243        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
244        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
245        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
246        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
247        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
248        MPI_Bcast_string(contextInfo.id,0,comm) ;
249      }
250    }
251    return ret ;
252  }
253
254  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
255  {
256    SRegisterContextInfo contextInfo ;
257    bool ret=getContextInfo(fullContextId, contextInfo) ;
258    if (ret) leader=contextInfo.leader ;
259    return ret ;
260  }
261
262  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
263  {
264
265    SRegisterContextInfo contextInfo ;
266    bool ret=getContextInfo(fullContextId, contextInfo) ;
267    if (ret) size=contextInfo.size ;
268    return ret ;
269  }
270
271  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
272  {
273    SRegisterContextInfo contextInfo ;
274    bool ret=getContextInfo(fullContextId, contextInfo) ;
275    if (ret) poolId=contextInfo.poolId ;
276    return ret ;
277  }
278
279  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
280  {
281    SRegisterContextInfo contextInfo ;
282    bool ret=getContextInfo(fullContextId, contextInfo) ;
283    if (ret) serviceId=contextInfo.serviceId ;
284    return ret ;
285  }
286
287  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
288  {
289    SRegisterContextInfo contextInfo ;
290    bool ret=getContextInfo(fullContextId, contextInfo) ;
291    if (ret) partitionId=contextInfo.partitionId ;
292    return ret ;
293  }
294 
295  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
296  {
297    SRegisterContextInfo contextInfo ;
298    bool ret=getContextInfo(fullContextId, contextInfo) ;
299    if (ret) serviceType=contextInfo.serviceType ;
300    return ret ;
301  }
302
303  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
304  {
305    SRegisterContextInfo contextInfo ;
306    bool ret=getContextInfo(fullContextId, contextInfo) ;
307    if (ret) contextId=contextInfo.id ;
308    return ret ;
309  }
310
311
312  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
313  {
314    SRegisterContextInfo contextInfo ;
315    return getContextInfo(fullContextId, contextInfo) ;
316  }
317
318  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
319  {
320    buffer.realloc(maxBufferSize_) ;
321    buffer<<(int)contexts_.size();
322   
323    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
324    { 
325      auto key = it->first ;
326      auto val = it->second ; 
327      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
328    }
329  } 
330
331  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
332  {
333    std::string contextId ;
334    SRegisterContextInfo ci;
335    int size; 
336    int leader ;
337
338    contexts_.clear() ;
339    int nbContexts ;
340    buffer>>nbContexts ;
341    for(int i=0;i<nbContexts;i++) 
342    {
343      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
344      contexts_[contextId]=ci ;
345    }
346
347  }
348}
Note: See TracBrowser for help on using the repository browser.