source: XIOS/dev/dev_ym/XIOS_SERVICES/src/manager/server_context.cpp @ 1764

Last change on this file since 1764 was 1764, checked in by ymipsl, 4 years ago

Some Update on XIOS services
Seems to work on Irène for :

  • first level of servers
  • fisrt + second level of servers
  • attached mode

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.9 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8
9
10namespace xios
11{
12  using namespace std ;
13
14  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
15
16  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
17                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
18                                 hasNotification_(false)
19  {
20   int localRank, globalRank, commSize ;
21
22    MPI_Comm_dup(contextComm, &contextComm_) ;
23    xiosComm_=CXios::getXiosComm() ;
24 
25    MPI_Comm_rank(xiosComm_,&globalRank) ;
26    MPI_Comm_rank(contextComm_,&localRank) ;
27 
28    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_) ;
29   
30    int type;
31    if (localRank==localLeader_) 
32    {
33      globalLeader_=globalRank ;
34      MPI_Comm_rank(contextComm_,&commSize) ;
35     
36      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
37      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
38      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
39      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
40    }
41    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
42    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
43    context_=CContext::create(name_);
44
45    context_->init(this, contextComm, type) ;
46
47    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
48                        <<" and global rank "<<globalRank<<endl  ;
49  }
50
51  CServerContext::~CServerContext()
52  {
53
54  } 
55
56  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
57                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
58  {
59    int intraCommRank ;
60    MPI_Comm_rank(intraComm, &intraCommRank) ;
61    int contextLeader ;
62
63    bool ok ;
64    int type ;
65    MPI_Comm newInterCommClient, newInterCommServer ;
66    MPI_Comm_dup(contextComm_,&newInterCommClient) ;
67    MPI_Comm_dup(contextComm_,&newInterCommServer) ;
68    overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
69    MPI_Barrier(contextComm_) ;
70
71    if (intraCommRank==0)
72    {
73      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
74      if (ok) 
75      {
76        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
77        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
78        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
79      }
80    }
81   
82    MPI_Request req ;
83    MPI_Status status ;
84    MPI_Ibarrier(intraComm,&req) ;
85   
86    int flag=false ;
87    while(!flag) 
88    {
89      CXios::getDaemonsManager()->servicesEventLoop() ;
90      MPI_Test(&req,&flag,&status) ;
91    }
92//    auto eventScheduler=parentService_->getEventScheduler() ;
93//    std::hash<string> hashString ;
94//    size_t hashId = hashString(name_) ;
95//    size_t currentTimeLine=0 ;
96//    eventScheduler->registerEvent(currentTimeLine,hashId);
97//
98//    while (!eventScheduler->queryEvent(currentTimeLine,hashId))
99//    {
100//       CXios::getDaemonsManager()->servicesEventLoop() ;
101//       eventScheduler->checkEvent() ;
102//    } 
103   
104    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
105
106    if (ok) 
107    {
108      int globalRank ;
109      MPI_Comm_rank(xiosComm_,&globalRank) ;
110      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
111     
112      int overlap, nOverlap ;
113      if (contextLeader==globalRank) overlap=1 ;
114      else overlap=0 ;
115      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
116/*
117      int overlap  ;
118      if (get<0>(overlapedComm_[name_])) overlap=1 ;
119      else overlap=0 ;
120
121      int nOverlap ; 
122      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
123      int commSize ;
124      MPI_Comm_size(contextComm_,&commSize ) ;
125*/
126      if (nOverlap> 0 )
127      {
128        while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ;
129        isAttachedMode_=true ;
130        cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
131        interCommClient=newInterCommClient ;
132        interCommServer=newInterCommServer ;
133      }
134      else if (nOverlap==0)
135      { 
136        cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
137        isAttachedMode_=false ;
138        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
139        MPI_Comm_dup(interCommClient, &interCommServer) ;
140        MPI_Comm_free(&newInterCommClient) ;
141        MPI_Comm_free(&newInterCommServer) ;
142      }
143      else
144      {
145        cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl ;
146      }
147    }
148    overlapedComm_.erase(name_) ;
149    return ok ;
150  }
151
152
153  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
154  {
155     int commSize ;
156     MPI_Comm_size(contextComm_,&commSize) ;
157     for(int rank=0; rank<commSize; rank++)
158     {
159       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
160       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
161       sendNotification(rank) ;
162     }
163  }
164 
165  void CServerContext::sendNotification(int rank)
166  {
167    winNotify_->lockWindow(rank,0) ;
168    winNotify_->pushToWindow(rank, this, &CServerContext::notificationsDumpOut) ;
169    winNotify_->unlockWindow(rank,0) ;
170  }
171
172 
173  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
174  {
175   
176    buffer.realloc(maxBufferSize_) ;
177   
178    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
179    {
180      auto& arg=notifyOutCreateIntercomm_ ;
181      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
182    }
183  }
184
185  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
186  {
187    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
188    else
189    {
190      buffer>>notifyInType_;
191      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
192      {
193        auto& arg=notifyInCreateIntercomm_ ;
194        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
195      }
196    }
197  }
198
199  void CServerContext::checkNotifications(void)
200  {
201    if (!hasNotification_)
202    {
203      int commRank ;
204      MPI_Comm_rank(contextComm_, &commRank) ;
205      winNotify_->lockWindow(commRank,0) ;
206      winNotify_->popFromWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
207      winNotify_->unlockWindow(commRank,0) ;
208     
209      if (notifyInType_!= NOTIFY_NOTHING)
210      {
211        hasNotification_=true ;
212        auto eventScheduler=parentService_->getEventScheduler() ;
213        std::hash<string> hashString ;
214        size_t hashId = hashString(name_) ;
215        size_t currentTimeLine=0 ;
216        eventScheduler->registerEvent(currentTimeLine,hashId); 
217      }
218    }
219   
220    if (hasNotification_)
221    {
222      auto eventScheduler=parentService_->getEventScheduler() ;
223      std::hash<string> hashString ;
224      size_t hashId = hashString(name_) ;
225      size_t currentTimeLine=0 ;
226      if (eventScheduler->queryEvent(currentTimeLine,hashId))
227      {
228        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
229        hasNotification_=false ;
230      }
231    }
232  }
233
234  bool CServerContext::eventLoop(bool serviceOnly)
235  {
236    bool finished=false ;
237    if (winNotify_!=nullptr) checkNotifications() ;
238    if (!serviceOnly && context_!=nullptr) 
239    {
240      if (context_->eventLoop())
241      {
242        context_=nullptr ;
243        // destroy context ??? --> later
244      }
245    }
246
247    if (context_==nullptr && finalizeSignal_) finished=true ;
248    return finished ;
249  }
250
251  void CServerContext::createIntercomm(void)
252  {
253     MPI_Comm interCommServer, interCommClient ;
254     auto& arg=notifyInCreateIntercomm_ ;
255     int remoteLeader=get<0>(arg) ;
256     string sourceContext=get<1>(arg) ;
257
258     auto it=overlapedComm_.find(sourceContext) ;
259     int overlap=0 ;
260     if (it!=overlapedComm_.end())
261     {
262       get<0>(it->second)=true ;
263       overlap=1 ;
264     }
265     int nOverlap ; 
266     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
267     int commSize ;
268     MPI_Comm_size(contextComm_,&commSize ) ;
269
270    if (nOverlap==commSize)
271    {
272      info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
273      isAttachedMode_=true ;
274      interCommClient=get<2>(it->second) ;
275      interCommServer=get<1>(it->second) ;
276      context_ -> createClientInterComm(interCommClient, interCommServer ) ;
277      clientsInterComm_.push_back(interCommClient) ;
278      clientsInterComm_.push_back(interCommServer) ;
279    }
280    else if (nOverlap==0)
281    { 
282      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
283      isAttachedMode_=false ;
284      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
285      MPI_Comm_dup(interCommServer,&interCommClient) ;
286      context_ -> createClientInterComm(interCommClient,interCommServer) ;
287      clientsInterComm_.push_back(interCommClient) ;
288      clientsInterComm_.push_back(interCommServer) ;
289    }
290    else
291    {
292      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partial overlap ==> not managed") ;
293    }
294   
295  }
296
297  void CServerContext::freeComm(void)
298  {
299    delete winNotify_ ;
300    winNotify_=nullptr ;
301    MPI_Comm_free(&contextComm_) ;
302    // don't forget intercomm -> later
303  }
304 
305  void CServerContext::finalizeSignal(void)
306  {
307    finalizeSignal_=true ;
308  }
309
310}
Note: See TracBrowser for help on using the repository browser.