source: XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/contexts_manager.cpp @ 1765

Last change on this file since 1765 was 1765, checked in by ymipsl, 4 years ago

Some cleaning On XIOS services branch

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.4 KB
Line 
1#include "contexts_manager.hpp"
2#include "cxios.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "services.hpp"
6#include "server_context.hpp"
7#include "servers_ressource.hpp"
8#include "server.hpp"
9#include <functional>
10
11
12namespace xios
13{
14  using namespace std ;
15
16  CContextsManager::CContextsManager(bool isXiosServer)
17  {
18    xiosComm_ = CXios::getXiosComm()  ;
19   
20    int commRank ; 
21    MPI_Comm_rank(xiosComm_, &commRank) ;
22    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ; 
23    else commRank=0 ;
24    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ;
25
26    MPI_Comm_rank(xiosComm_, &commRank) ;
27    winNotify_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
28   
29
30    winContexts_ = new CWindowManager(xiosComm_, maxBufferSize_) ;
31    winContexts_->lockWindow(commRank,0) ;
32    winContexts_->updateToWindow(commRank, this, &CContextsManager::contextsDumpOut) ;
33    winContexts_->unlockWindow(commRank,0) ;
34
35    MPI_Barrier(xiosComm_)  ;   
36  }
37
38
39  CContextsManager::~CContextsManager()
40  {
41    delete winNotify_ ;
42    delete winContexts_ ;
43  }
44
45  bool CContextsManager::createServerContext(const std::string& poolId, const std::string& serviceId, const int& partitionId,
46                                             const string& contextId, bool wait)
47  {
48    int serviceLeader ;
49    auto servicesManager = CXios::getServicesManager() ;
50   
51    bool ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
52
53    if (wait)
54    {
55      while (!ok) 
56      {
57        CXios::getDaemonsManager()->servicesEventLoop() ;
58        ok=servicesManager->getServiceLeader(poolId, serviceId, partitionId, serviceLeader) ;
59      }
60    }
61
62    if (ok) 
63    {
64      notifyType_=NOTIFY_CREATE_CONTEXT ;
65      notifyCreateContext_=make_tuple(poolId, serviceId, partitionId, contextId) ;
66      sendNotification(serviceLeader) ;
67      return true ;
68    }
69    else return false ;
70  }
71
72
73  bool CContextsManager::createServerContextIntercomm(const string& poolId, const string& serviceId, const int& partitionId,
74                                                      const string& contextId, const string& sourceContext, bool wait)
75  {
76    int contextLeader ;
77    bool ok ;
78    int remoteLeader ;
79    MPI_Comm_rank(xiosComm_, &remoteLeader) ;
80   
81    int type ;
82    ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
83    if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
84    if (wait)
85    {
86      while (!ok) 
87      {
88        CXios::getDaemonsManager()->servicesEventLoop() ;
89        ok=CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
90        if (ok) ok=getContextLeader(getServerContextName(poolId, serviceId, partitionId, type, contextId), contextLeader) ;
91      }
92    }
93   
94    if (ok) 
95    {
96      notifyType_=NOTIFY_CREATE_INTERCOMM ;
97      notifyCreateIntercomm_=make_tuple(poolId, serviceId, partitionId, contextId, remoteLeader, sourceContext) ;
98      sendNotification(contextLeader) ;
99      return true ;
100    }
101    else return false ;
102  }
103
104  void CContextsManager::sendNotification(int rank)
105  {
106    winNotify_->lockWindow(rank,0) ;
107    winNotify_->pushToWindow(rank, this, &CContextsManager::notificationsDumpOut) ;
108    winNotify_->unlockWindow(rank,0) ;
109  }
110
111 
112  void CContextsManager::notificationsDumpOut(CBufferOut& buffer)
113  {
114   
115    buffer.realloc(maxBufferSize_) ;
116   
117    if (notifyType_==NOTIFY_CREATE_CONTEXT)
118    {
119      auto& arg=notifyCreateContext_ ;
120      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) ;
121    }
122    else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
123    {
124      auto& arg=notifyCreateIntercomm_ ;
125      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg)<< get<5>(arg) ;
126    }
127  }
128
129  void CContextsManager::notificationsDumpIn(CBufferIn& buffer)
130  {
131    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
132    else
133    {
134      buffer>>notifyType_;
135      if (notifyType_==NOTIFY_CREATE_CONTEXT)
136      {
137        auto& arg=notifyCreateContext_ ;
138        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg) ;
139      }
140      else if (notifyType_==NOTIFY_CREATE_INTERCOMM)
141      {
142        auto& arg=notifyCreateIntercomm_ ;
143        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg) >> get<3>(arg) >> get<4>(arg) >> get<5>(arg);
144      }
145    }
146
147  }
148
149  void CContextsManager::eventLoop(void)
150  {
151    checkNotifications() ;
152  }
153 
154  void CContextsManager::checkNotifications(void)
155  {
156    int commRank ;
157    MPI_Comm_rank(xiosComm_, &commRank) ;
158    winNotify_->lockWindow(commRank,0) ;
159    winNotify_->popFromWindow(commRank, this, &CContextsManager::notificationsDumpIn) ;
160    winNotify_->unlockWindow(commRank,0) ;
161    if (notifyType_==NOTIFY_CREATE_CONTEXT) createServerContext() ;
162    else if (notifyType_==NOTIFY_CREATE_INTERCOMM) createServerContextIntercomm() ;
163
164  }
165
166  void CContextsManager::createServerContext(void)
167  {
168    auto arg=notifyCreateContext_ ;
169    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
170                             ->createContext(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg)) ;
171 
172  }
173
174  void CContextsManager::createServerContextIntercomm(void)
175  {
176    auto arg=notifyCreateIntercomm_ ;
177    CXios::getPoolRessource()->getService(get<1>(arg), get<2>(arg))
178                             ->getServerContext(get<3>(arg))
179                             ->createIntercomm(get<4>(arg), get<5>(arg)) ;
180  }             
181
182  string CContextsManager::getServerContextName(const string& poolId, const string& serviceId, const int& partitionId, 
183                                                const int& type, const string& contextId)
184  {
185    if (type==CServicesManager::CLIENT) return contextId;
186    else
187    {
188      ostringstream oss;
189      oss<<partitionId;
190      return poolId+"::"+serviceId+"_"+oss.str()+"::"+contextId;
191    }
192  }
193
194  void CContextsManager::registerContext(const string& fullContextId, const SRegisterContextInfo& contextInfo)
195  {
196    winContexts_->lockWindow(managerGlobalLeader_,0) ;
197    winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
198    contexts_[fullContextId] = contextInfo ;
199    winContexts_->updateToWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpOut) ;
200    winContexts_->unlockWindow(managerGlobalLeader_,0) ;   
201  }
202
203  bool CContextsManager::getContextInfo(const string& fullContextId, SRegisterContextInfo& contextInfo, MPI_Comm comm)
204  {
205    bool ret ;
206    int commRank=0 ;
207    if (comm!=MPI_COMM_NULL) MPI_Comm_rank(comm, &commRank) ;
208
209    if (commRank==0)
210    {
211
212      winContexts_->lockWindow(managerGlobalLeader_,0) ;
213      winContexts_->updateFromWindow(managerGlobalLeader_, this, &CContextsManager::contextsDumpIn) ;
214      winContexts_->unlockWindow(managerGlobalLeader_,0) ;
215
216      auto it=contexts_.find(fullContextId) ;
217      if ( it == contexts_.end()) ret=false ;
218      else
219      {
220        contextInfo=it->second ; 
221        ret=true ;
222      }
223    }
224   
225    if (comm!=MPI_COMM_NULL) 
226    {
227      MPI_Bcast(&ret,1,MPI_INT,0,comm) ;
228      if (ret)
229      {
230        MPI_Bcast(&contextInfo.leader,1,MPI_INT,0,comm) ;
231        MPI_Bcast(&contextInfo.size,1,MPI_INT,0,comm) ;
232        MPI_Bcast_string(contextInfo.poolId,0,comm) ;
233        MPI_Bcast_string(contextInfo.serviceId,0,comm) ;
234        MPI_Bcast(&contextInfo.serviceType,1,MPI_INT,0,comm) ;
235        MPI_Bcast(&contextInfo.partitionId,1,MPI_INT,0,comm) ;
236        MPI_Bcast_string(contextInfo.id,0,comm) ;
237      }
238    }
239    return ret ;
240  }
241
242  bool CContextsManager::getContextLeader(const string& fullContextId, int& leader, MPI_Comm comm)
243  {
244    SRegisterContextInfo contextInfo ;
245    bool ret=getContextInfo(fullContextId, contextInfo) ;
246    if (ret) leader=contextInfo.leader ;
247    return ret ;
248  }
249
250  bool CContextsManager::getContextSize(const string& fullContextId, int& size, MPI_Comm comm)
251  {
252
253    SRegisterContextInfo contextInfo ;
254    bool ret=getContextInfo(fullContextId, contextInfo) ;
255    if (ret) size=contextInfo.size ;
256    return ret ;
257  }
258
259  bool CContextsManager::getContextPoolId(const string& fullContextId, string& poolId, MPI_Comm comm)
260  {
261    SRegisterContextInfo contextInfo ;
262    bool ret=getContextInfo(fullContextId, contextInfo) ;
263    if (ret) poolId=contextInfo.poolId ;
264    return ret ;
265  }
266
267  bool CContextsManager::getContextServiceId(const string& fullContextId, string& serviceId, MPI_Comm comm)
268  {
269    SRegisterContextInfo contextInfo ;
270    bool ret=getContextInfo(fullContextId, contextInfo) ;
271    if (ret) serviceId=contextInfo.serviceId ;
272    return ret ;
273  }
274
275  bool CContextsManager::getContextPartitionId(const string& fullContextId, int& partitionId, MPI_Comm comm)
276  {
277    SRegisterContextInfo contextInfo ;
278    bool ret=getContextInfo(fullContextId, contextInfo) ;
279    if (ret) partitionId=contextInfo.partitionId ;
280    return ret ;
281  }
282 
283  bool CContextsManager::getContextServiceType(const string& fullContextId, int& serviceType, MPI_Comm comm)
284  {
285    SRegisterContextInfo contextInfo ;
286    bool ret=getContextInfo(fullContextId, contextInfo) ;
287    if (ret) serviceType=contextInfo.serviceType ;
288    return ret ;
289  }
290
291  bool CContextsManager::getContextId(const string& fullContextId, string& contextId, MPI_Comm comm)
292  {
293    SRegisterContextInfo contextInfo ;
294    bool ret=getContextInfo(fullContextId, contextInfo) ;
295    if (ret) contextId=contextInfo.id ;
296    return ret ;
297  }
298
299
300  bool CContextsManager::hasContext(const string& fullContextId, MPI_Comm comm)
301  {
302    SRegisterContextInfo contextInfo ;
303    return getContextInfo(fullContextId, contextInfo) ;
304  }
305
306  void CContextsManager::contextsDumpOut(CBufferOut& buffer)
307  {
308    buffer.realloc(maxBufferSize_) ;
309    buffer<<(int)contexts_.size();
310   
311    for(auto it=contexts_.begin();it!=contexts_.end(); ++it)
312    { 
313      auto key = it->first ;
314      auto val = it->second ; 
315      buffer << key << val.poolId<<val.serviceId<<val.partitionId<<val.serviceType<<val.id<<val.size<<val.leader  ;
316    }
317  } 
318
319  void CContextsManager::contextsDumpIn(CBufferIn& buffer)
320  {
321    std::string contextId ;
322    SRegisterContextInfo ci;
323    int size; 
324    int leader ;
325
326    contexts_.clear() ;
327    int nbContexts ;
328    buffer>>nbContexts ;
329    for(int i=0;i<nbContexts;i++) 
330    {
331      buffer>>contextId>>ci.poolId>>ci.serviceId>>ci.partitionId>>ci.serviceType>>ci.id>>ci.size>>ci.leader ;
332      contexts_[contextId]=ci ;
333    }
334
335  }
336}
Note: See TracBrowser for help on using the repository browser.