source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/contexts_manager.cpp @ 2246

Last change on this file since 2246 was 2246, checked in by ymipsl, 3 years ago
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.5 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include "timer.hpp"
10#include <functional>
11
12
13namespace xios
14{
15  using namespace std ;
16
17  CContextsManager::CContextsManager(bool isXiosServer)
18  {
19    xiosComm_ = CXios::getXiosComm()  ;
20   
21    int commRank ; 
22    MPI_Comm_rank(xiosComm_, &commRank) ;
23    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
24    else commRank=0 ;
25    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
26
27    MPI_Comm_rank(xiosComm_, &commRank) ;
28    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
29   
30
31    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
32    winContexts_->lockWindow(commRank,0) ;
33    winContexts_->updateToWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
34    winContexts_->unlockWindow(commRank,0) ;
35
36    MPI_Barrier(xiosComm_)  ;   
37  }
38
39
40  CContextsManager::~CContextsManager()
41  {
42    delete winNotify_ ;
43    delete winContexts_ ;
44  }
45
46  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
47                                             const string& contextId, bool wait)
48  {
49    int serviceLeader ;
50    auto servicesManager = CXios::getServicesManager() ;
51   
52    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
53
54    info(40)<<"CContextsManager::createServerContext : waiting for service leader ;  serviceId : "<<serviceId<<endl ;
55    if (wait)
56    {
57      while (!ok) 
58      {
59        CXios::getDaemonsManager()->servicesEventLoop() ;
60        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
61      }
62    }
63
64    if (ok) 
65    {
66      notifyType_=NOTIFY_CREATE_CONTEXT ;
67      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
68      info(40)<<"CContextsManager::createServerContext : notification create_context to service leader "<<serviceLeader<<", serviceId : "<<serviceId<<", contextId "<<contextId<<endl ;
69      sendNotification(serviceLeader) ;
70      return true ;
71    }
72    else return false ;
73  }
74
75
76  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
77                                                      const string& contextId, const string& sourceContext, bool wait)
78  {
79    int contextLeader ;
80    bool ok ;
81    int remoteLeader ;
82    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
83   
84    int type ;
85    info(40)<<"CContextsManager::createServerContextIntercomm : waiting for context leader ;  contextId : "<<contextId<<endl ;
86    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
87    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
88    if (wait)
89    {
90      while (!ok) 
91      {
92        CXios::getDaemonsManager()->servicesEventLoop() ;
93        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
94        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
95      }
96    }
97   
98    if (ok) 
99    {
100      notifyType_=NOTIFY_CREATE_INTERCOMM ;
101      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
102      info(40)<<"CContextsManager::createServerContextIntercomm : notification create_intercomm to context leader : "<<contextLeader<<", contextId :"<<contextId<<endl ;
103      sendNotification(contextLeader) ;
104      return true ;
105    }
106    else return false ;
107  }
108
109  void CContextsManager::sendNotification(int rank)
110  {
111    winNotify_->lockWindow(rank,0) ;
112    winNotify_->pushToWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
113    winNotify_->unlockWindow(rank,0) ;
114  }
115
116 
117  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
118  {
119   
120    buffer.realloc(maxBufferSize_) ;
121   
122    if (notifyType_==NOTIFY_CREATE_CONTEXT)
123    {
124      auto& arg=notifyCreateContext_ ;
125      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
126    }
127    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
128    {
129      auto& arg=notifyCreateIntercomm_ ;
130      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
131    }
132  }
133
134  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
135  {
136    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
137    else
138    {
139      buffer>>notifyType_;
140      if (notifyType_==NOTIFY_CREATE_CONTEXT)
141      {
142        auto& arg=notifyCreateContext_ ;
143        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
144      }
145      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
146      {
147        auto& arg=notifyCreateIntercomm_ ;
148        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
149      }
150    }
151
152  }
153
154  void CContextsManager::eventLoop(void)
155  {
156    CTimer::get("CContextsManager::eventLoop").resume();
157    double time=MPI_Wtime() ;
158    if (time-lastEventLoop_ > eventLoopLatency_) 
159    {
160      checkNotifications() ;
161      lastEventLoop_=time ;
162    }
163    CTimer::get("CContextsManager::eventLoop").suspend();
164  }
165 
166  void CContextsManager::checkNotifications(void)
167  {
168    int commRank ;
169    MPI_Comm_rank(xiosComm_, &commRank) ;
170    winNotify_->lockWindow(commRank,0) ;
171    winNotify_->popFromWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
172    winNotify_->unlockWindow(commRank,0) ;
173    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
174    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
175
176  }
177
178  void CContextsManager::createServerContext(void)
179  {
180    info(40)<<"CContextsManager::createServerContext : receive create server context notification"<<endl ;
181    auto arg=notifyCreateContext_ ;
182    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
183                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
184 
185  }
186
187  void CContextsManager::createServerContextIntercomm(void)
188  {
189    info(40)<<"CContextsManager::createServerContext : receive create intercomm context notification"<<endl ;
190    auto arg=notifyCreateIntercomm_ ;
191    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
192                             ->getServerContext(get<3>(arg))
193                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
194  }             
195
196  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
197                                                const int& type, const string& contextId)
198  {
199    if (type==CServicesManager::CLIENT) return contextId;
200    else
201    {
202      ostringstream oss;
203      oss<<partitionId;
204      return poolId+"::"+serviceId+"_"+oss.str()+"::"+contextId;
205    }
206  }
207
208  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
209  {
210    winContexts_->lockWindowExclusive(managerGlobalLeader_) ;
211    winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
212    winContexts_->flushWindow(managerGlobalLeader_) ;
213    contexts_[fullContextId] = contextInfo ;
214    winContexts_->updateToLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
215    winContexts_->unlockWindow(managerGlobalLeader_) ;
216  }
217
218  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
219  {
220    bool ret ;
221    int commRank=0 ;
222    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
223
224    if (commRank==0)
225    {
226
227      winContexts_->lockWindowShared(managerGlobalLeader_) ;
228      winContexts_->updateFromLockedWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
229      winContexts_->unlockWindow(managerGlobalLeader_) ;
230
231      auto it=contexts_.find(fullContextId) ;
232      if ( it == contexts_.end()) ret=false ;
233      else
234      {
235        contextInfo=it->second ; 
236        ret=true ;
237      }
238    }
239   
240    if (comm!=MPI_COMM_NULL) 
241    {
242      MPI_Bcast(&ret,1,MPI_INT,0,comm) ;
243      if (ret)
244      {
245        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
246        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
247        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
248        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
249        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
250        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
251        MPI_Bcast_string(contextInfo.id,0,comm) ;
252      }
253    }
254    return ret ;
255  }
256
257  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
258  {
259    SRegisterContextInfo contextInfo ;
260    bool ret=getContextInfo(fullContextId, contextInfo) ;
261    if (ret) leader=contextInfo.leader ;
262    return ret ;
263  }
264
265  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
266  {
267
268    SRegisterContextInfo contextInfo ;
269    bool ret=getContextInfo(fullContextId, contextInfo) ;
270    if (ret) size=contextInfo.size ;
271    return ret ;
272  }
273
274  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
275  {
276    SRegisterContextInfo contextInfo ;
277    bool ret=getContextInfo(fullContextId, contextInfo) ;
278    if (ret) poolId=contextInfo.poolId ;
279    return ret ;
280  }
281
282  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
283  {
284    SRegisterContextInfo contextInfo ;
285    bool ret=getContextInfo(fullContextId, contextInfo) ;
286    if (ret) serviceId=contextInfo.serviceId ;
287    return ret ;
288  }
289
290  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
291  {
292    SRegisterContextInfo contextInfo ;
293    bool ret=getContextInfo(fullContextId, contextInfo) ;
294    if (ret) partitionId=contextInfo.partitionId ;
295    return ret ;
296  }
297 
298  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
299  {
300    SRegisterContextInfo contextInfo ;
301    bool ret=getContextInfo(fullContextId, contextInfo) ;
302    if (ret) serviceType=contextInfo.serviceType ;
303    return ret ;
304  }
305
306  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
307  {
308    SRegisterContextInfo contextInfo ;
309    bool ret=getContextInfo(fullContextId, contextInfo) ;
310    if (ret) contextId=contextInfo.id ;
311    return ret ;
312  }
313
314
315  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
316  {
317    SRegisterContextInfo contextInfo ;
318    return getContextInfo(fullContextId, contextInfo) ;
319  }
320
321  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
322  {
323    buffer.realloc(maxBufferSize_) ;
324    buffer<<(int)contexts_.size();
325   
326    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
327    { 
328      auto key = it->first ;
329      auto val = it->second ; 
330      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
331    }
332  } 
333
334  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
335  {
336    std::string contextId ;
337    SRegisterContextInfo ci;
338    int size; 
339    int leader ;
340
341    contexts_.clear() ;
342    int nbContexts ;
343    buffer>>nbContexts ;
344    for(int i=0;i<nbContexts;i++) 
345    {
346      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
347      contexts_[contextId]=ci ;
348    }
349
350  }
351}
Note: See TracBrowser for help on using the repository browser.