source: XIOS3/trunk/src/manager/server_context.cpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 9 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.8 KB
Line 
1#include "server_context.hpp"
2#include "contexts_manager.hpp"
3#include "cxios.hpp"
4#include "mpi.hpp"
5#include "context.hpp"
6#include "register_context_info.hpp"
7#include "services.hpp"
8#include "thread_manager.hpp"
9#include "timer.hpp"
10
11
12namespace xios
13{
14  using namespace std ;
15
16  map<string, tuple<bool,MPI_Comm,MPI_Comm> > CServerContext::overlapedComm_ ;
17
18  CServerContext::CServerContext(CService* parentService, MPI_Comm contextComm, const std::string& poolId, const std::string& serviceId, 
19                                 const int& partitionId, const std::string& contextId) : finalizeSignal_(false), parentService_(parentService),
20                                 hasNotification_(false)
21  {
22   info(40)<<"CCServerContext::CServerContext  : new context creation ; contextId : "<<contextId<<endl ;
23   int localRank, globalRank, commSize ;
24
25    MPI_Comm_dup(contextComm, &contextComm_) ;
26    xiosComm_=CXios::getXiosComm() ;
27 
28    MPI_Comm_rank(xiosComm_,&globalRank) ;
29    MPI_Comm_rank(contextComm_,&localRank) ;
30 
31    winNotify_ = new CWindowManager(contextComm_, maxBufferSize_) ;
32    MPI_Barrier(contextComm_) ;
33   
34    int type;
35    if (localRank==localLeader_) 
36    {
37      globalLeader_=globalRank ;
38      MPI_Comm_rank(contextComm_,&commSize) ;
39     
40      CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
41      SRegisterContextInfo contextInfo = {poolId, serviceId, partitionId, type, contextId, commSize, globalLeader_} ;
42      name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
43      CXios::getContextsManager()->registerContext(name_, contextInfo) ;
44    }
45    MPI_Bcast(&type, 1, MPI_INT, localLeader_,contextComm_) ;
46    name_ = CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
47    context_=CContext::create(name_);
48
49    context_->init(this, contextComm, type) ;
50
51    info(10)<<"Context "<< CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId)<<" created, on local rank "<<localRank
52                        <<" and global rank "<<globalRank<<endl  ;
53   
54    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServerContext::threadEventLoop, this) ;
55  }
56
57  CServerContext::~CServerContext()
58  {
59    cout<<"Server Context destructor"<<endl;
60  } 
61
62  bool CServerContext::createIntercomm(const string& poolId, const string& serviceId, const int& partitionId, const string& contextId, 
63                                       const MPI_Comm& intraComm, MPI_Comm& interCommClient, MPI_Comm& interCommServer, bool wait)
64  {
65    info(40)<<"CServerContext::createIntercomm  : context intercomm creation ; contextId : "<<contextId<<endl ;
66    int intraCommRank ;
67    MPI_Comm_rank(intraComm, &intraCommRank) ;
68    int contextLeader ;
69
70    bool ok ;
71    int type ;
72    MPI_Comm newInterCommClient, newInterCommServer ;
73    MPI_Comm_dup(contextComm_,&newInterCommClient) ;
74    MPI_Comm_dup(contextComm_,&newInterCommServer) ;
75    overlapedComm_[name_]=tuple<bool, MPI_Comm, MPI_Comm>(false, newInterCommClient, newInterCommServer) ;
76    MPI_Barrier(contextComm_) ;
77
78    if (intraCommRank==0)
79    {
80      ok=CXios::getContextsManager()->createServerContextIntercomm(poolId, serviceId, partitionId, contextId, name_, wait) ;
81      if (ok) 
82      {
83        CXios::getServicesManager()->getServiceType(poolId,serviceId, 0, type) ;
84        string name=CXios::getContextsManager()->getServerContextName(poolId, serviceId, partitionId, type, contextId) ;
85        CXios::getContextsManager()->getContextLeader(name, contextLeader) ;
86      }
87    }
88   
89    if (wait)
90    {
91      MPI_Request req ;
92      MPI_Status status ;
93      MPI_Ibarrier(intraComm,&req) ;
94   
95      int flag=false ;
96      while(!flag) 
97      {
98        CXios::getDaemonsManager()->servicesEventLoop() ;
99        MPI_Test(&req,&flag,&status) ;
100      }
101    }
102   
103    MPI_Bcast(&ok, 1, MPI_INT, 0, intraComm) ;
104
105    if (ok) 
106    {
107      int globalRank ;
108      MPI_Comm_rank(xiosComm_,&globalRank) ;
109      MPI_Bcast(&contextLeader, 1, MPI_INT, 0, intraComm) ;
110     
111      int overlap, nOverlap ;
112      if (contextLeader==globalRank) overlap=1 ;
113      else overlap=0 ;
114      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
115/*
116      int overlap  ;
117      if (get<0>(overlapedComm_[name_])) overlap=1 ;
118      else overlap=0 ;
119
120      int nOverlap ; 
121      MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
122      int commSize ;
123      MPI_Comm_size(contextComm_,&commSize ) ;
124*/
125      if (nOverlap==0)
126      { 
127        MPI_Intercomm_create(intraComm, 0, xiosComm_, contextLeader, 3141, &interCommClient) ;
128        MPI_Comm_dup(interCommClient, &interCommServer) ;
129        MPI_Comm_free(&newInterCommClient) ;
130        MPI_Comm_free(&newInterCommServer) ;
131      }
132      else
133      {
134        ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
135      }
136    }
137    overlapedComm_.erase(name_) ;
138    return ok ;
139  }
140
141
142  void CServerContext::createIntercomm(int remoteLeader, const string& sourceContext)
143  {
144     int commSize ;
145     MPI_Comm_size(contextComm_,&commSize) ;
146     info(40)<<"CServerContext::createIntercomm  : notify createContextIntercomm to all context members ; sourceContext : "<<sourceContext<<endl ;
147   
148     for(int rank=0; rank<commSize; rank++)
149     {
150       notifyOutType_=NOTIFY_CREATE_INTERCOMM ;
151       notifyOutCreateIntercomm_ = make_tuple(remoteLeader, sourceContext) ;
152       sendNotification(rank) ;
153     }
154  }
155 
156  void CServerContext::sendNotification(int rank)
157  {
158    winNotify_->pushToExclusiveWindow(rank, this, &CServerContext::notificationsDumpOut) ;
159  }
160
161 
162  void CServerContext::notificationsDumpOut(CBufferOut& buffer)
163  {
164   
165    buffer.realloc(maxBufferSize_) ;
166   
167    if (notifyOutType_==NOTIFY_CREATE_INTERCOMM)
168    {
169      auto& arg=notifyOutCreateIntercomm_ ;
170      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) ;
171    }
172  }
173
174  void CServerContext::notificationsDumpIn(CBufferIn& buffer)
175  {
176    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
177    else
178    {
179      buffer>>notifyInType_;
180      if (notifyInType_==NOTIFY_CREATE_INTERCOMM)
181      {
182        auto& arg=notifyInCreateIntercomm_ ;
183        buffer >> std::get<0>(arg)>> std::get<1>(arg) ;
184      }
185    }
186  }
187
188  void CServerContext::checkNotifications(void)
189  {
190    if (!hasNotification_)
191    {
192      double time=MPI_Wtime() ;
193      if (time-lastEventLoop_ > eventLoopLatency_) 
194      {
195        int commRank ;
196        MPI_Comm_rank(contextComm_, &commRank) ;
197        winNotify_->popFromExclusiveWindow(commRank, this, &CServerContext::notificationsDumpIn) ;
198       
199        if (notifyInType_!= NOTIFY_NOTHING)
200        {
201          hasNotification_=true ;
202          auto eventScheduler=parentService_->getEventScheduler() ;
203          std::hash<string> hashString ;
204          size_t hashId = hashString(name_) ;
205          size_t currentTimeLine=0 ;
206          eventScheduler->registerEvent(currentTimeLine,hashId); 
207        }
208        lastEventLoop_=time ;
209      }
210    }
211   
212    if (hasNotification_)
213    {
214      auto eventScheduler=parentService_->getEventScheduler() ;
215      std::hash<string> hashString ;
216      size_t hashId = hashString(name_) ;
217      size_t currentTimeLine=0 ;
218      if (eventScheduler->queryEvent(currentTimeLine,hashId))
219      {
220        eventScheduler->popEvent() ;
221        if (notifyInType_==NOTIFY_CREATE_INTERCOMM) createIntercomm() ;
222        hasNotification_=false ;
223      }
224    }
225  }
226
227  bool CServerContext::eventLoop(bool serviceOnly)
228  {
229    CTimer::get("CServerContext::eventLoop").resume();
230    bool finished=false ;
231    int flag ;
232    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
233
234//    double time=MPI_Wtime() ;
235//    if (time-lastEventLoop_ > eventLoopLatency_)
236//    {
237      if (winNotify_!=nullptr) checkNotifications() ;
238//      lastEventLoop_=time ;
239//    }
240
241
242    if (!serviceOnly && context_!=nullptr) 
243    {
244      if (context_->eventLoop())
245      {
246        info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
247        CContext::removeContext(context_->getId()) ;
248        context_=nullptr ;
249        // destroy context ??? --> later
250      }
251    }
252    CTimer::get("CServerContext::eventLoop").suspend();
253    if (context_==nullptr && finalizeSignal_) finished=true ;
254    return finished ;
255  }
256
257  void CServerContext::threadEventLoop(void)
258  {
259   
260    info(100)<<"Launch Thread for CServerContext::threadEventLoop, context id = "<<context_->getId()<<endl ;
261    CThreadManager::threadInitialize() ; 
262    do
263    {
264      CTimer::get("CServerContext::eventLoop").resume();
265      int flag ;
266      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
267
268      if (winNotify_!=nullptr) checkNotifications() ;
269
270
271      if (context_!=nullptr) 
272      {
273        if (context_->eventLoop())
274        {
275          info(100)<<"Remove context server with id "<<context_->getId()<<endl ;
276          CContext::removeContext(context_->getId()) ;
277          context_=nullptr ;
278          // destroy context ??? --> later
279        }
280      }
281      CTimer::get("CServerContext::eventLoop").suspend();
282      if (context_==nullptr && finalizeSignal_) finished_=true ;
283 
284      if (!finished_) CThreadManager::yield() ;
285    }
286    while (!finished_) ;
287   
288    CThreadManager::threadFinalize() ;
289    info(100)<<"Close thread for CServerContext::threadEventLoop"<<endl ;
290  }
291
292  void CServerContext::createIntercomm(void)
293  {
294    info(40)<<"CServerContext::createIntercomm  : received createIntercomm notification"<<endl ;
295
296     MPI_Comm interCommServer, interCommClient ;
297     auto& arg=notifyInCreateIntercomm_ ;
298     int remoteLeader=get<0>(arg) ;
299     string sourceContext=get<1>(arg) ;
300
301     auto it=overlapedComm_.find(sourceContext) ;
302     int overlap=0 ;
303     if (it!=overlapedComm_.end())
304     {
305       get<0>(it->second)=true ;
306       overlap=1 ;
307     }
308     int nOverlap ; 
309     MPI_Allreduce(&overlap, &nOverlap, 1, MPI_INT, MPI_SUM, contextComm_) ;
310     int commSize ;
311     MPI_Comm_size(contextComm_,&commSize ) ;
312
313    if (nOverlap==0)
314    { 
315      info(10)<<"CServerContext::createIntercomm : No overlap ==> context in server mode"<<endl ;
316      MPI_Intercomm_create(contextComm_, 0, xiosComm_, remoteLeader, 3141, &interCommServer) ;
317      MPI_Comm_dup(interCommServer,&interCommClient) ;
318      context_ -> createClientInterComm(interCommClient,interCommServer) ;
319      clientsInterComm_.push_back(interCommClient) ;
320      clientsInterComm_.push_back(interCommServer) ;
321    }
322    else
323    {
324      ERROR("void CServerContext::createIntercomm(void)",<<"CServerContext::createIntercomm : overlap ==> not managed") ;
325    }
326   
327  }
328
329  void CServerContext::freeComm(void)
330  {
331    delete winNotify_ ;
332    winNotify_=nullptr ;
333    MPI_Comm_free(&contextComm_) ;
334    // don't forget intercomm -> later
335  }
336 
337  void CServerContext::finalizeSignal(void)
338  {
339    finalizeSignal_=true ;
340  }
341
342}
Note: See TracBrowser for help on using the repository browser.