source: XIOS3/trunk/src/manager/server_context.cpp @ 2589

Last change on this file since 2589 was 2589, checked in by jderouillat, 7 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.3 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8#include "thread_manager.hpp"
9#include "timer.hpp"
10
11
12namespace xios
13{
14  using namespace std ;
15
16  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
17
18  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
19                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
20                                 hasNotification_(false)
21  {
22   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
23   int localRank, globalRank, commSize ;
24
25    xios::MPI_Comm_dup(contextComm, &contextComm_) ;
26    CXios::getMpiGarbageCollector().registerCommunicator(contextComm_) ;
27    xiosComm_=CXios::getXiosComm() ;
28 
29    MPI_Comm_rank(xiosComm_,&globalRank) ;
30    MPI_Comm_rank(contextComm_,&localRank) ;
31 
32    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_,"CServerContext::winNotify_") ;
33    MPI_Barrier(contextComm_) ;
34   
35    int type;
36    if (localRank==localLeader_) 
37    {
38      globalLeader_=globalRank ;
39      MPI_Comm_rank(contextComm_,&commSize) ;
40     
41      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
42      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
43      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
44      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
45    }
46    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
47    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
48    context_=CContext::create(name_);
49
50    context_->init(this, contextComm, type) ;
51
52    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
53                        <<" and global rank "<<globalRank<<endl  ;
54   
55    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ;
56  }
57
58  CServerContext::~CServerContext()
59  {
60    delete winNotify_ ;
61    cout<<"Server Context destructor"<<endl;
62  } 
63
64  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
65                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
66  {
67    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
68    int intraCommRank ;
69    MPI_Comm_rank(intraComm, &intraCommRank) ;
70    int contextLeader ;
71
72    bool ok ;
73    int type ;
74   
75
76    if (intraCommRank==0)
77    {
78      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
79      if (ok) 
80      {
81        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
82        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
83        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
84      }
85    }
86   
87    if (wait)
88    {
89      MPI_Request req ;
90      MPI_Status status ;
91      MPI_Ibarrier(intraComm,&req) ;
92   
93      int flag=false ;
94      while(!flag) 
95      {
96        CXios::getDaemonsManager()->servicesEventLoop() ;
97        MPI_Test(&req,&flag,&status) ;
98      }
99    }
100   
101    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
102
103    if (ok) 
104    {
105      MPI_Comm newInterCommClient, newInterCommServer ;
106      xios::MPI_Comm_dup(contextComm_,&newInterCommClient) ;
107      xios::MPI_Comm_dup(contextComm_,&newInterCommServer) ;
108      overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
109      MPI_Barrier(contextComm_) ;
110
111      int globalRank ;
112      MPI_Comm_rank(xiosComm_,&globalRank) ;
113      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
114     
115      int overlap, nOverlap ;
116      if (contextLeader==globalRank) overlap=1 ;
117      else overlap=0 ;
118      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
119/*
120      int overlap  ;
121      if (get<0>(overlapedComm_[name_])) overlap=1 ;
122      else overlap=0 ;
123
124      int nOverlap ; 
125      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
126      int commSize ;
127      MPI_Comm_size(contextComm_,&commSize ) ;
128*/
129      if (nOverlap==0)
130      { 
131        xios::MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
132        CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
133        xios::MPI_Comm_dup(interCommClient, &interCommServer) ;
134        CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
135        xios::MPI_Comm_free(&newInterCommClient) ;
136        xios::MPI_Comm_free(&newInterCommServer) ;
137      }
138      else
139      {
140        ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
141      }
142    }
143    overlapedComm_.erase(name_) ;
144    return ok ;
145  }
146
147
148  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
149  {
150     int commSize ;
151     MPI_Comm_size(contextComm_,&commSize) ;
152     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
153   
154     for(int rank=0; rank<commSize; rank++)
155     {
156       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
157       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
158       sendNotification(rank) ;
159     }
160  }
161 
162  void CServerContext::sendNotification(int rank)
163  {
164    winNotify_->pushToExclusiveWindow(rank, this, &CServerContext::notificationsDumpOut) ;
165  }
166
167 
168  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
169  {
170   
171    buffer.realloc(maxBufferSize_) ;
172   
173    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
174    {
175      auto& arg=notifyOutCreateIntercomm_ ;
176      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
177    }
178  }
179
180  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
181  {
182    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
183    else
184    {
185      buffer>>notifyInType_;
186      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
187      {
188        auto& arg=notifyInCreateIntercomm_ ;
189        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
190      }
191    }
192  }
193
194  void CServerContext::checkNotifications(void)
195  {
196    if (!hasNotification_)
197    {
198      double time=MPI_Wtime() ;
199      if (time-lastEventLoop_ > eventLoopLatency_) 
200      {
201        int commRank ;
202        MPI_Comm_rank(contextComm_, &commRank) ;
203        winNotify_->popFromExclusiveWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
204       
205        if (notifyInType_!= NOTIFY_NOTHING)
206        {
207          hasNotification_=true ;
208          auto eventScheduler=parentService_->getEventScheduler() ;
209          std::hash<string> hashString ;
210          size_t hashId = hashString(name_) ;
211          size_t currentTimeLine=0 ;
212          eventScheduler->registerEvent(currentTimeLine,hashId); 
213        }
214        lastEventLoop_=time ;
215      }
216    }
217   
218    if (hasNotification_)
219    {
220      auto eventScheduler=parentService_->getEventScheduler() ;
221      std::hash<string> hashString ;
222      size_t hashId = hashString(name_) ;
223      size_t currentTimeLine=0 ;
224      if (eventScheduler->queryEvent(currentTimeLine,hashId))
225      {
226        eventScheduler->popEvent() ;
227        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
228        hasNotification_=false ;
229      }
230    }
231  }
232
233  bool CServerContext::eventLoop(bool serviceOnly)
234  {
235    CTimer::get("CServerContext::eventLoop").resume();
236    bool finished=false ;
237    int flag ;
238    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
239
240//    double time=MPI_Wtime() ;
241//    if (time-lastEventLoop_ > eventLoopLatency_)
242//    {
243      if (winNotify_!=nullptr) checkNotifications() ;
244//      lastEventLoop_=time ;
245//    }
246
247
248    if (!serviceOnly && context_!=nullptr) 
249    {
250      if (context_->eventLoop())
251      {
252        info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
253        CContext::removeContext(context_->getId()) ;
254        context_=nullptr ;
255        // destroy context ??? --> later
256      }
257    }
258    CTimer::get("CServerContext::eventLoop").suspend();
259    if (context_==nullptr && finalizeSignal_) finished=true ;
260    return finished ;
261  }
262
263  void CServerContext::threadEventLoop(void)
264  {
265   
266    info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ;
267    CThreadManager::threadInitialize() ; 
268    do
269    {
270      CTimer::get("CServerContext::eventLoop").resume();
271      int flag ;
272      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
273
274      if (winNotify_!=nullptr) checkNotifications() ;
275
276
277      if (context_!=nullptr) 
278      {
279        if (context_->eventLoop())
280        {
281          info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
282          CContext::removeContext(context_->getId()) ;
283          context_=nullptr ;
284          // destroy context ??? --> later
285        }
286      }
287      CTimer::get("CServerContext::eventLoop").suspend();
288      if (context_==nullptr && finalizeSignal_) finished_=true ;
289 
290      if (!finished_) CThreadManager::yield() ;
291    }
292    while (!finished_) ;
293   
294    CThreadManager::threadFinalize() ;
295    info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ;
296  }
297
298  void CServerContext::createIntercomm(void)
299  {
300    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
301
302     MPI_Comm interCommServer, interCommClient ;
303     auto& arg=notifyInCreateIntercomm_ ;
304     int remoteLeader=get<0>(arg) ;
305     string sourceContext=get<1>(arg) ;
306
307     auto it=overlapedComm_.find(sourceContext) ;
308     int overlap=0 ;
309     if (it!=overlapedComm_.end())
310     {
311       get<0>(it->second)=true ;
312       overlap=1 ;
313     }
314     int nOverlap ; 
315     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
316     int commSize ;
317     MPI_Comm_size(contextComm_,&commSize ) ;
318
319    if (nOverlap==0)
320    { 
321      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
322      xios::MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
323      CXios::getMpiGarbageCollector().registerCommunicator(interCommServer) ;
324      xios::MPI_Comm_dup(interCommServer,&interCommClient) ;
325      CXios::getMpiGarbageCollector().registerCommunicator(interCommClient) ;
326      context_ -> createClientInterComm(interCommClient,interCommServer) ;
327      clientsInterComm_.push_back(interCommClient) ;
328      clientsInterComm_.push_back(interCommServer) ;
329    }
330    else
331    {
332      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
333    }
334   
335  }
336
337  void CServerContext::freeComm(void)
338  {
339    //delete winNotify_ ;
340    //winNotify_=nullptr ;
341    //xios::MPI_Comm_free(&contextComm_) ;
342    // don't forget intercomm -> later
343  }
344 
345  void CServerContext::finalizeSignal(void)
346  {
347    finalizeSignal_=true ;
348  }
349
350}
Note: See TracBrowser for help on using the repository browser.