source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp @ 2258

Last change on this file since 2258 was 2258, checked in by ymipsl, 3 years ago

One sided protocol improvment.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.4 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8#include "timer.hpp"
9
10
11namespace xios
12{
13  using namespace std ;
14
15  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
16
17  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
18                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
19                                 hasNotification_(false)
20  {
21   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
22   int localRank, globalRank, commSize ;
23
24    MPI_Comm_dup(contextComm, &contextComm_) ;
25    xiosComm_=CXios::getXiosComm() ;
26 
27    MPI_Comm_rank(xiosComm_,&globalRank) ;
28    MPI_Comm_rank(contextComm_,&localRank) ;
29 
30    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_) ;
31    MPI_Barrier(contextComm_) ;
32   
33    int type;
34    if (localRank==localLeader_) 
35    {
36      globalLeader_=globalRank ;
37      MPI_Comm_rank(contextComm_,&commSize) ;
38     
39      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
40      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
41      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
42      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
43    }
44    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
45    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
46    context_=CContext::create(name_);
47
48    context_->init(this, contextComm, type) ;
49
50    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
51                        <<" and global rank "<<globalRank<<endl  ;
52  }
53
54  CServerContext::~CServerContext()
55  {
56
57  } 
58
59  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
60                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
61  {
62    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
63    int intraCommRank ;
64    MPI_Comm_rank(intraComm, &intraCommRank) ;
65    int contextLeader ;
66
67    bool ok ;
68    int type ;
69    MPI_Comm newInterCommClient, newInterCommServer ;
70    MPI_Comm_dup(contextComm_,&newInterCommClient) ;
71    MPI_Comm_dup(contextComm_,&newInterCommServer) ;
72    overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
73    MPI_Barrier(contextComm_) ;
74
75    if (intraCommRank==0)
76    {
77      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
78      if (ok) 
79      {
80        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
81        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
82        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
83      }
84    }
85   
86    MPI_Request req ;
87    MPI_Status status ;
88    MPI_Ibarrier(intraComm,&req) ;
89   
90    int flag=false ;
91    while(!flag) 
92    {
93      CXios::getDaemonsManager()->servicesEventLoop() ;
94      MPI_Test(&req,&flag,&status) ;
95    }
96
97    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
98
99    if (ok) 
100    {
101      int globalRank ;
102      MPI_Comm_rank(xiosComm_,&globalRank) ;
103      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
104     
105      int overlap, nOverlap ;
106      if (contextLeader==globalRank) overlap=1 ;
107      else overlap=0 ;
108      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
109/*
110      int overlap  ;
111      if (get<0>(overlapedComm_[name_])) overlap=1 ;
112      else overlap=0 ;
113
114      int nOverlap ; 
115      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
116      int commSize ;
117      MPI_Comm_size(contextComm_,&commSize ) ;
118*/
119      if (nOverlap> 0 )
120      {
121        while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ;
122        isAttachedMode_=true ;
123        cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
124        interCommClient=newInterCommClient ;
125        interCommServer=newInterCommServer ;
126      }
127      else if (nOverlap==0)
128      { 
129        cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
130        isAttachedMode_=false ;
131        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
132        MPI_Comm_dup(interCommClient, &interCommServer) ;
133        MPI_Comm_free(&newInterCommClient) ;
134        MPI_Comm_free(&newInterCommServer) ;
135      }
136      else
137      {
138        cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl ;
139      }
140    }
141    overlapedComm_.erase(name_) ;
142    return ok ;
143  }
144
145
146  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
147  {
148     int commSize ;
149     MPI_Comm_size(contextComm_,&commSize) ;
150     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
151   
152     for(int rank=0; rank<commSize; rank++)
153     {
154       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
155       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
156       sendNotification(rank) ;
157     }
158  }
159 
160  void CServerContext::sendNotification(int rank)
161  {
162    winNotify_->lockWindowExclusive(rank) ;
163    winNotify_->pushToLockedWindow(rank, this, &CServerContext::notificationsDumpOut) ;
164    winNotify_->unlockWindow(rank) ;
165  }
166
167 
168  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
169  {
170   
171    buffer.realloc(maxBufferSize_) ;
172   
173    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
174    {
175      auto& arg=notifyOutCreateIntercomm_ ;
176      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
177    }
178  }
179
180  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
181  {
182    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
183    else
184    {
185      buffer>>notifyInType_;
186      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
187      {
188        auto& arg=notifyInCreateIntercomm_ ;
189        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
190      }
191    }
192  }
193
194  void CServerContext::checkNotifications(void)
195  {
196    if (!hasNotification_)
197    {
198      double time=MPI_Wtime() ;
199      if (time-lastEventLoop_ > eventLoopLatency_) 
200      {
201        int commRank ;
202        MPI_Comm_rank(contextComm_, &commRank) ;
203        winNotify_->lockWindowExclusive(commRank) ;
204        winNotify_->popFromLockedWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
205        winNotify_->unlockWindow(commRank) ;
206     
207        if (notifyInType_!= NOTIFY_NOTHING)
208        {
209          hasNotification_=true ;
210          auto eventScheduler=parentService_->getEventScheduler() ;
211          std::hash<string> hashString ;
212          size_t hashId = hashString(name_) ;
213          size_t currentTimeLine=0 ;
214          eventScheduler->registerEvent(currentTimeLine,hashId); 
215        }
216        lastEventLoop_=time ;
217      }
218    }
219   
220    if (hasNotification_)
221    {
222      auto eventScheduler=parentService_->getEventScheduler() ;
223      std::hash<string> hashString ;
224      size_t hashId = hashString(name_) ;
225      size_t currentTimeLine=0 ;
226      if (eventScheduler->queryEvent(currentTimeLine,hashId))
227      {
228        eventScheduler->popEvent() ;
229        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
230        hasNotification_=false ;
231      }
232    }
233  }
234
235  bool CServerContext::eventLoop(bool serviceOnly)
236  {
237    CTimer::get("CServerContext::eventLoop").resume();
238    bool finished=false ;
239   
240//    double time=MPI_Wtime() ;
241//    if (time-lastEventLoop_ > eventLoopLatency_)
242//    {
243      if (winNotify_!=nullptr) checkNotifications() ;
244//      lastEventLoop_=time ;
245//    }
246
247
248    if (!serviceOnly && context_!=nullptr) 
249    {
250      if (context_->eventLoop())
251      {
252        context_=nullptr ;
253        // destroy context ??? --> later
254      }
255    }
256    CTimer::get("CServerContext::eventLoop").suspend();
257    if (context_==nullptr && finalizeSignal_) finished=true ;
258    return finished ;
259  }
260
261  void CServerContext::createIntercomm(void)
262  {
263    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
264
265     MPI_Comm interCommServer, interCommClient ;
266     auto& arg=notifyInCreateIntercomm_ ;
267     int remoteLeader=get<0>(arg) ;
268     string sourceContext=get<1>(arg) ;
269
270     auto it=overlapedComm_.find(sourceContext) ;
271     int overlap=0 ;
272     if (it!=overlapedComm_.end())
273     {
274       get<0>(it->second)=true ;
275       overlap=1 ;
276     }
277     int nOverlap ; 
278     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
279     int commSize ;
280     MPI_Comm_size(contextComm_,&commSize ) ;
281
282    if (nOverlap==commSize)
283    {
284      info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
285      isAttachedMode_=true ;
286      interCommClient=get<2>(it->second) ;
287      interCommServer=get<1>(it->second) ;
288      context_ -> createClientInterComm(interCommClient, interCommServer ) ;
289      clientsInterComm_.push_back(interCommClient) ;
290      clientsInterComm_.push_back(interCommServer) ;
291    }
292    else if (nOverlap==0)
293    { 
294      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
295      isAttachedMode_=false ;
296      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
297      MPI_Comm_dup(interCommServer,&interCommClient) ;
298      context_ -> createClientInterComm(interCommClient,interCommServer) ;
299      clientsInterComm_.push_back(interCommClient) ;
300      clientsInterComm_.push_back(interCommServer) ;
301    }
302    else
303    {
304      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partial overlap ==> not managed") ;
305    }
306   
307  }
308
309  void CServerContext::freeComm(void)
310  {
311    delete winNotify_ ;
312    winNotify_=nullptr ;
313    MPI_Comm_free(&contextComm_) ;
314    // don't forget intercomm -> later
315  }
316 
317  void CServerContext::finalizeSignal(void)
318  {
319    finalizeSignal_=true ;
320  }
321
322}
Note: See TracBrowser for help on using the repository browser.