source: XIOS3/trunk/src/manager/server_context.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 6 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.5 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8#include "thread_manager.hpp"
9#include "timer.hpp"
10
11
12namespace xios
13{
14  using namespace std ;
15  extern CLogType logTimers ;
16
17  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
18
19  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
20                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
21                                 hasNotification_(false)
22  {
23   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
24   int localRank, globalRank, commSize ;
25
26    xios::MPI_Comm_dup(contextComm, &contextComm_) ;
27    CXios::getMpiGarbageCollector().registerCommunicator(contextComm_) ;
28    xiosComm_=CXios::getXiosComm() ;
29 
30    MPI_Comm_rank(xiosComm_,&globalRank) ;
31    MPI_Comm_rank(contextComm_,&localRank) ;
32 
33    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_,"CServerContext::winNotify_") ;
34    MPI_Barrier(contextComm_) ;
35   
36    int type;
37    if (localRank==localLeader_) 
38    {
39      globalLeader_=globalRank ;
40      MPI_Comm_rank(contextComm_,&commSize) ;
41     
42      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
43      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
44      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
45      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
46    }
47    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
48    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
49    context_=CContext::create(name_);
50
51    context_->init(this, contextComm, type) ;
52
53    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
54                        <<" and global rank "<<globalRank<<endl  ;
55   
56    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ;
57  }
58
59  CServerContext::~CServerContext()
60  {
61    delete winNotify_ ;
62    cout<<"Server Context destructor"<<endl;
63  } 
64
65  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
66                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
67  {
68    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
69    int intraCommRank ;
70    MPI_Comm_rank(intraComm, &intraCommRank) ;
71    int contextLeader ;
72
73    bool ok ;
74    int type ;
75   
76
77    if (intraCommRank==0)
78    {
79      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
80      if (ok) 
81      {
82        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
83        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
84        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
85      }
86    }
87   
88    if (wait)
89    {
90      MPI_Request req ;
91      MPI_Status status ;
92      MPI_Ibarrier(intraComm,&req) ;
93   
94      int flag=false ;
95      while(!flag) 
96      {
97        CXios::getDaemonsManager()->servicesEventLoop() ;
98        MPI_Test(&req,&flag,&status) ;
99      }
100    }
101   
102    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
103
104    if (ok) 
105    {
106      MPI_Comm newInterCommClient, newInterCommServer ;
107      xios::MPI_Comm_dup(contextComm_,&newInterCommClient) ;
108      xios::MPI_Comm_dup(contextComm_,&newInterCommServer) ;
109      overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
110      MPI_Barrier(contextComm_) ;
111
112      int globalRank ;
113      MPI_Comm_rank(xiosComm_,&globalRank) ;
114      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
115     
116      int overlap, nOverlap ;
117      if (contextLeader==globalRank) overlap=1 ;
118      else overlap=0 ;
119      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
120/*
121      int overlap  ;
122      if (get<0>(overlapedComm_[name_])) overlap=1 ;
123      else overlap=0 ;
124
125      int nOverlap ; 
126      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
127      int commSize ;
128      MPI_Comm_size(contextComm_,&commSize ) ;
129*/
130      if (nOverlap==0)
131      { 
132        xios::MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
133        CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
134        xios::MPI_Comm_dup(interCommClient, &interCommServer) ;
135        CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
136        xios::MPI_Comm_free(&newInterCommClient) ;
137        xios::MPI_Comm_free(&newInterCommServer) ;
138      }
139      else
140      {
141        ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
142      }
143    }
144    overlapedComm_.erase(name_) ;
145    return ok ;
146  }
147
148
149  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
150  {
151     int commSize ;
152     MPI_Comm_size(contextComm_,&commSize) ;
153     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
154   
155     for(int rank=0; rank<commSize; rank++)
156     {
157       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
158       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
159       sendNotification(rank) ;
160     }
161  }
162 
163  void CServerContext::sendNotification(int rank)
164  {
165    winNotify_->pushToExclusiveWindow(rank, this, &CServerContext::notificationsDumpOut) ;
166  }
167
168 
169  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
170  {
171   
172    buffer.realloc(maxBufferSize_) ;
173   
174    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
175    {
176      auto& arg=notifyOutCreateIntercomm_ ;
177      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
178    }
179  }
180
181  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
182  {
183    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
184    else
185    {
186      buffer>>notifyInType_;
187      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
188      {
189        auto& arg=notifyInCreateIntercomm_ ;
190        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
191      }
192    }
193  }
194
195  void CServerContext::checkNotifications(void)
196  {
197    if (!hasNotification_)
198    {
199      double time=MPI_Wtime() ;
200      if (time-lastEventLoop_ > eventLoopLatency_) 
201      {
202        int commRank ;
203        MPI_Comm_rank(contextComm_, &commRank) ;
204        winNotify_->popFromExclusiveWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
205       
206        if (notifyInType_!= NOTIFY_NOTHING)
207        {
208          hasNotification_=true ;
209          auto eventScheduler=parentService_->getEventScheduler() ;
210          std::hash<string> hashString ;
211          size_t hashId = hashString(name_) ;
212          size_t currentTimeLine=0 ;
213          eventScheduler->registerEvent(currentTimeLine,hashId); 
214        }
215        lastEventLoop_=time ;
216      }
217    }
218   
219    if (hasNotification_)
220    {
221      auto eventScheduler=parentService_->getEventScheduler() ;
222      std::hash<string> hashString ;
223      size_t hashId = hashString(name_) ;
224      size_t currentTimeLine=0 ;
225      if (eventScheduler->queryEvent(currentTimeLine,hashId))
226      {
227        eventScheduler->popEvent() ;
228        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
229        hasNotification_=false ;
230      }
231    }
232  }
233
234  bool CServerContext::eventLoop(bool serviceOnly)
235  {
236    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").resume();
237    bool finished=false ;
238    int flag ;
239    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
240
241//    double time=MPI_Wtime() ;
242//    if (time-lastEventLoop_ > eventLoopLatency_)
243//    {
244      if (winNotify_!=nullptr) checkNotifications() ;
245//      lastEventLoop_=time ;
246//    }
247
248
249    if (!serviceOnly && context_!=nullptr) 
250    {
251      if (context_->eventLoop())
252      {
253        info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
254        CContext::removeContext(context_->getId()) ;
255        context_=nullptr ;
256        // destroy context ??? --> later
257      }
258    }
259    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").suspend();
260    if (context_==nullptr && finalizeSignal_) finished=true ;
261    return finished ;
262  }
263
264  void CServerContext::threadEventLoop(void)
265  {
266   
267    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").resume();
268    info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ;
269    CThreadManager::threadInitialize() ; 
270    do
271    {
272      int flag ;
273      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
274
275      if (winNotify_!=nullptr) checkNotifications() ;
276
277
278      if (context_!=nullptr) 
279      {
280        if (context_->eventLoop())
281        {
282          info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
283          CContext::removeContext(context_->getId()) ;
284          context_=nullptr ;
285          // destroy context ??? --> later
286        }
287      }
288      if (context_==nullptr && finalizeSignal_) finished_=true ;
289 
290      if (!finished_) CThreadManager::yield() ;
291    }
292    while (!finished_) ;
293   
294    CThreadManager::threadFinalize() ;
295    info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ;
296    if (info.isActive(logTimers)) CTimer::get("CServerContext::eventLoop").suspend();
297  }
298
299  void CServerContext::createIntercomm(void)
300  {
301    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
302
303     MPI_Comm interCommServer, interCommClient ;
304     auto& arg=notifyInCreateIntercomm_ ;
305     int remoteLeader=get<0>(arg) ;
306     string sourceContext=get<1>(arg) ;
307
308     auto it=overlapedComm_.find(sourceContext) ;
309     int overlap=0 ;
310     if (it!=overlapedComm_.end())
311     {
312       get<0>(it->second)=true ;
313       overlap=1 ;
314     }
315     int nOverlap ; 
316     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
317     int commSize ;
318     MPI_Comm_size(contextComm_,&commSize ) ;
319
320    if (nOverlap==0)
321    { 
322      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
323      xios::MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
324      CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
325      xios::MPI_Comm_dup(interCommServer,&interCommClient) ;
326      CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
327      context_ -> createClientInterComm(interCommClient,interCommServer) ;
328      clientsInterComm_.push_back(interCommClient) ;
329      clientsInterComm_.push_back(interCommServer) ;
330    }
331    else
332    {
333      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
334    }
335   
336  }
337
338  void CServerContext::freeComm(void)
339  {
340    //delete winNotify_ ;
341    //winNotify_=nullptr ;
342    //xios::MPI_Comm_free(&contextComm_) ;
343    // don't forget intercomm -> later
344  }
345 
346  void CServerContext::finalizeSignal(void)
347  {
348    finalizeSignal_=true ;
349  }
350
351}
Note: See TracBrowser for help on using the repository browser.