source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/server_context.cpp @ 2265

Last change on this file since 2265 was 2265, checked in by ymipsl, 2 years ago

tracking memory leak : remove context and all related object from object factory when context is finalized.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.6 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8#include "timer.hpp"
9
10
11namespace xios
12{
13  using namespace std ;
14
15  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
16
17  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
18                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
19                                 hasNotification_(false)
20  {
21   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
22   int localRank, globalRank, commSize ;
23
24    MPI_Comm_dup(contextComm, &contextComm_) ;
25    xiosComm_=CXios::getXiosComm() ;
26 
27    MPI_Comm_rank(xiosComm_,&globalRank) ;
28    MPI_Comm_rank(contextComm_,&localRank) ;
29 
30    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_) ;
31    MPI_Barrier(contextComm_) ;
32   
33    int type;
34    if (localRank==localLeader_) 
35    {
36      globalLeader_=globalRank ;
37      MPI_Comm_rank(contextComm_,&commSize) ;
38     
39      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
40      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
41      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
42      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
43    }
44    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
45    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
46    context_=CContext::create(name_);
47
48    context_->init(this, contextComm, type) ;
49
50    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
51                        <<" and global rank "<<globalRank<<endl  ;
52  }
53
54  CServerContext::~CServerContext()
55  {
56
57  } 
58
59  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
60                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
61  {
62    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
63    int intraCommRank ;
64    MPI_Comm_rank(intraComm, &intraCommRank) ;
65    int contextLeader ;
66
67    bool ok ;
68    int type ;
69    MPI_Comm newInterCommClient, newInterCommServer ;
70    MPI_Comm_dup(contextComm_,&newInterCommClient) ;
71    MPI_Comm_dup(contextComm_,&newInterCommServer) ;
72    overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
73    MPI_Barrier(contextComm_) ;
74
75    if (intraCommRank==0)
76    {
77      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
78      if (ok) 
79      {
80        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
81        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
82        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
83      }
84    }
85   
86    MPI_Request req ;
87    MPI_Status status ;
88    MPI_Ibarrier(intraComm,&req) ;
89   
90    int flag=false ;
91    while(!flag) 
92    {
93      CXios::getDaemonsManager()->servicesEventLoop() ;
94      MPI_Test(&req,&flag,&status) ;
95    }
96
97    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
98
99    if (ok) 
100    {
101      int globalRank ;
102      MPI_Comm_rank(xiosComm_,&globalRank) ;
103      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
104     
105      int overlap, nOverlap ;
106      if (contextLeader==globalRank) overlap=1 ;
107      else overlap=0 ;
108      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
109/*
110      int overlap  ;
111      if (get<0>(overlapedComm_[name_])) overlap=1 ;
112      else overlap=0 ;
113
114      int nOverlap ; 
115      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
116      int commSize ;
117      MPI_Comm_size(contextComm_,&commSize ) ;
118*/
119      if (nOverlap> 0 )
120      {
121        while (get<0>(overlapedComm_[name_])==false) CXios::getDaemonsManager()->servicesEventLoop() ;
122        isAttachedMode_=true ;
123        cout<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
124        interCommClient=newInterCommClient ;
125        interCommServer=newInterCommServer ;
126      }
127      else if (nOverlap==0)
128      { 
129        cout<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
130        isAttachedMode_=false ;
131        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
132        MPI_Comm_dup(interCommClient, &interCommServer) ;
133        MPI_Comm_free(&newInterCommClient) ;
134        MPI_Comm_free(&newInterCommServer) ;
135      }
136      else
137      {
138        cout<<"CServerContext::createIntercomm : partial overlap ==> not managed"<<endl ;
139      }
140    }
141    overlapedComm_.erase(name_) ;
142    return ok ;
143  }
144
145
146  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
147  {
148     int commSize ;
149     MPI_Comm_size(contextComm_,&commSize) ;
150     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
151   
152     for(int rank=0; rank<commSize; rank++)
153     {
154       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
155       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
156       sendNotification(rank) ;
157     }
158  }
159 
160  void CServerContext::sendNotification(int rank)
161  {
162    winNotify_->lockWindowExclusive(rank) ;
163    winNotify_->pushToLockedWindow(rank, this, &CServerContext::notificationsDumpOut) ;
164    winNotify_->unlockWindow(rank) ;
165  }
166
167 
168  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
169  {
170   
171    buffer.realloc(maxBufferSize_) ;
172   
173    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
174    {
175      auto& arg=notifyOutCreateIntercomm_ ;
176      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
177    }
178  }
179
180  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
181  {
182    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
183    else
184    {
185      buffer>>notifyInType_;
186      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
187      {
188        auto& arg=notifyInCreateIntercomm_ ;
189        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
190      }
191    }
192  }
193
194  void CServerContext::checkNotifications(void)
195  {
196    if (!hasNotification_)
197    {
198      double time=MPI_Wtime() ;
199      if (time-lastEventLoop_ > eventLoopLatency_) 
200      {
201        int commRank ;
202        MPI_Comm_rank(contextComm_, &commRank) ;
203        winNotify_->lockWindowExclusive(commRank) ;
204        winNotify_->popFromLockedWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
205        winNotify_->unlockWindow(commRank) ;
206     
207        if (notifyInType_!= NOTIFY_NOTHING)
208        {
209          hasNotification_=true ;
210          auto eventScheduler=parentService_->getEventScheduler() ;
211          std::hash<string> hashString ;
212          size_t hashId = hashString(name_) ;
213          size_t currentTimeLine=0 ;
214          eventScheduler->registerEvent(currentTimeLine,hashId); 
215        }
216        lastEventLoop_=time ;
217      }
218    }
219   
220    if (hasNotification_)
221    {
222      auto eventScheduler=parentService_->getEventScheduler() ;
223      std::hash<string> hashString ;
224      size_t hashId = hashString(name_) ;
225      size_t currentTimeLine=0 ;
226      if (eventScheduler->queryEvent(currentTimeLine,hashId))
227      {
228        eventScheduler->popEvent() ;
229        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
230        hasNotification_=false ;
231      }
232    }
233  }
234
235  bool CServerContext::eventLoop(bool serviceOnly)
236  {
237    CTimer::get("CServerContext::eventLoop").resume();
238    bool finished=false ;
239    int flag ;
240    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
241
242//    double time=MPI_Wtime() ;
243//    if (time-lastEventLoop_ > eventLoopLatency_)
244//    {
245      if (winNotify_!=nullptr) checkNotifications() ;
246//      lastEventLoop_=time ;
247//    }
248
249
250    if (!serviceOnly && context_!=nullptr) 
251    {
252      if (context_->eventLoop())
253      {
254        info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
255        CContext::removeContext(context_->getId()) ;
256        context_=nullptr ;
257        // destroy context ??? --> later
258      }
259    }
260    CTimer::get("CServerContext::eventLoop").suspend();
261    if (context_==nullptr && finalizeSignal_) finished=true ;
262    return finished ;
263  }
264
265  void CServerContext::createIntercomm(void)
266  {
267    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
268
269     MPI_Comm interCommServer, interCommClient ;
270     auto& arg=notifyInCreateIntercomm_ ;
271     int remoteLeader=get<0>(arg) ;
272     string sourceContext=get<1>(arg) ;
273
274     auto it=overlapedComm_.find(sourceContext) ;
275     int overlap=0 ;
276     if (it!=overlapedComm_.end())
277     {
278       get<0>(it->second)=true ;
279       overlap=1 ;
280     }
281     int nOverlap ; 
282     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
283     int commSize ;
284     MPI_Comm_size(contextComm_,&commSize ) ;
285
286    if (nOverlap==commSize)
287    {
288      info(10)<<"CServerContext::createIntercomm : total overlap ==> context in attached mode"<<endl ;
289      isAttachedMode_=true ;
290      interCommClient=get<2>(it->second) ;
291      interCommServer=get<1>(it->second) ;
292      context_ -> createClientInterComm(interCommClient, interCommServer ) ;
293      clientsInterComm_.push_back(interCommClient) ;
294      clientsInterComm_.push_back(interCommServer) ;
295    }
296    else if (nOverlap==0)
297    { 
298      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
299      isAttachedMode_=false ;
300      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
301      MPI_Comm_dup(interCommServer,&interCommClient) ;
302      context_ -> createClientInterComm(interCommClient,interCommServer) ;
303      clientsInterComm_.push_back(interCommClient) ;
304      clientsInterComm_.push_back(interCommServer) ;
305    }
306    else
307    {
308      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : partial overlap ==> not managed") ;
309    }
310   
311  }
312
313  void CServerContext::freeComm(void)
314  {
315    delete winNotify_ ;
316    winNotify_=nullptr ;
317    MPI_Comm_free(&contextComm_) ;
318    // don't forget intercomm -> later
319  }
320 
321  void CServerContext::finalizeSignal(void)
322  {
323    finalizeSignal_=true ;
324  }
325
326}
Note: See TracBrowser for help on using the repository browser.