source: XIOS3/trunk/src/manager/services.cpp @ 2523

Last change on this file since 2523 was 2523, checked in by ymipsl, 12 months ago

Adaptation to new hyper event scheduler.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.6 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "timer.hpp"
8
9namespace xios
10{
11  CService::CService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
12                     int type, int nbPartitions) 
13                         : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
14                           partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
15
16
17  {
18    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
19   
20    int localRank, globalRank, commSize ;
21
22    MPI_Comm_dup(serviceComm, &serviceComm_) ;
23    MPI_Comm globalComm_=CXios::getXiosComm() ;
24 
25    MPI_Comm_rank(globalComm_,&globalRank) ;
26    MPI_Comm_rank(serviceComm_,&localRank) ;
27   
28    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_) ;
29    winNotify_->updateToExclusiveWindow(localRank, this, &CService::createContextDumpOut) ;
30    MPI_Barrier(serviceComm_) ;
31    if (localRank==localLeader_) 
32    {
33      globalLeader_=globalRank ;
34      MPI_Comm_rank(serviceComm_,&commSize) ;
35      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
36    }
37    if (eventScheduler) eventScheduler_ = eventScheduler ;
38    else eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ;
39
40    ostringstream oss;
41    oss<<partitionId;
42    name_= poolId+"__"+serviceId+"_"+oss.str();
43  }
44
45  CService::~CService()
46  {
47    delete winNotify_ ;
48    for(auto& it : contexts_) delete it.second ;
49  }
50
51
52  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
53  {
54    int commSize ;
55    MPI_Comm_size(serviceComm_, &commSize) ;
56    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
57
58    for(int rank=0; rank<commSize; rank++) 
59    {
60      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
61      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
62      sendNotification(rank) ;
63    }
64    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
65  }
66/*
67  void CService::createContext(const std::string& contextId)
68  {
69    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
70  }
71*/
72  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
73  {
74    winNotify_->lockWindowExclusive(rank) ;
75    winNotify_->updateFromLockedWindow(rank, this, &CService::createContextDumpIn) ;
76    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
77    winNotify_->updateToLockedWindow(rank, this, &CService::createContextDumpOut) ; 
78    winNotify_->unlockWindowExclusive(rank) ;   
79  }
80
81
82  void CService::createContextDumpOut(CBufferOut& buffer)
83  {
84    buffer.realloc(maxBufferSize_) ;
85   
86    buffer << (int) (notifications_.size());
87   
88    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
89      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
90  }
91
92
93  void CService::createContextDumpIn(CBufferIn& buffer)
94  {
95    std::string poolId ;
96    std::string serviceId ;
97    int partitionId ;
98    std::string contextId ;
99   
100    notifications_.clear() ;
101    int nbNotifications ;
102    buffer>>nbNotifications ;
103    for(int i=0;i<nbNotifications;i++) 
104    {
105      buffer>>poolId>>serviceId>>partitionId>>contextId ;
106      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
107    }
108  }
109
110  bool CService::eventLoop(bool serviceOnly)
111  {
112    //checkCreateContextNotification() ;
113    CTimer::get("CService::eventLoop").resume();
114    int flag ;
115    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
116   
117//    double time=MPI_Wtime() ;
118//    if (time-lastEventLoop_ > eventLoopLatency_)
119//    {
120      checkNotifications() ;
121//      lastEventLoop_=time ;
122//    }
123
124
125    eventScheduler_->checkEvent() ;
126    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
127    {
128      if (it->second->eventLoop(serviceOnly))
129      {
130        delete it->second ; 
131        contexts_.erase(it) ;
132        // destroy server_context -> to do later
133        break ;
134      } ;
135    }
136    CTimer::get("CService::eventLoop").suspend();
137    if (contexts_.empty() && finalizeSignal_) return true ;
138    else return false ;
139  }
140
141  void CService::sendNotification(int rank)
142  {
143    winNotify_->pushToExclusiveWindow(rank, this, &CService::notificationsDumpOut) ;
144  }
145
146 
147  void CService::notificationsDumpOut(CBufferOut& buffer)
148  {
149   
150    buffer.realloc(maxBufferSize_) ;
151   
152    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
153    {
154      auto& arg=notifyOutCreateContext_ ;
155      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
156    }
157  }
158
159  void CService::notificationsDumpIn(CBufferIn& buffer)
160  {
161    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
162    else
163    {
164      buffer>>notifyInType_;
165      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
166      {
167        auto& arg=notifyInCreateContext_ ;
168        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
169      }
170    }
171  }
172
173
174
175
176  void CService::checkNotifications(void)
177  {
178    if (!hasNotification_)
179    {
180      double time=MPI_Wtime() ;
181      if (time-lastEventLoop_ > eventLoopLatency_) 
182      {
183        int commRank ;
184        MPI_Comm_rank(serviceComm_, &commRank) ;
185        winNotify_->popFromExclusiveWindow(commRank, this, &CService::notificationsDumpIn) ;
186       
187        if (notifyInType_!= NOTIFY_NOTHING)
188        {
189          hasNotification_=true ;
190          std::hash<string> hashString ;
191          size_t hashId = hashString(name_) ;
192          size_t currentTimeLine=0 ;
193          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : timeLine : "<<currentTimeLine<<"  hashId : "<<hashId<<endl ;
194          eventScheduler_->registerEvent(currentTimeLine,hashId); 
195        }
196        lastEventLoop_=time ;
197      }
198    }
199   
200    if (hasNotification_)
201    {
202      std::hash<string> hashString ;
203      size_t hashId = hashString(name_) ;
204      size_t currentTimeLine=0 ;
205//      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
206      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
207      {
208        eventScheduler_->popEvent() ;
209        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
210        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
211        hasNotification_=false ;
212      }
213    }
214  }
215
216
217
218//ym not use any more
219  void CService::checkCreateContextNotification(void)
220  {
221    int commRank ;
222    MPI_Comm_rank(serviceComm_, &commRank) ;
223    winNotify_->lockWindowExclusive(commRank) ;
224    winNotify_->updateFromLockedWindow(commRank, this, &CService::createContextDumpIn) ;
225   
226    if (!notifications_.empty())
227    {
228      auto info = notifications_.front() ;
229      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
230      notifications_.pop_front() ;
231      winNotify_->updateToLockedWindow(commRank, this, &CService::createContextDumpOut) ;     
232    }
233    winNotify_->unlockWindowExclusive(commRank) ;
234  }
235
236  void CService::createContext(void)
237   {
238     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
239     auto& arg=notifyInCreateContext_ ;
240     string poolId = get<0>(arg) ;
241     string serviceId = get<1>(arg) ;
242     int partitionId = get<2>(arg) ;
243     string contextId = get<3>(arg) ;
244     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ;
245   }
246
247   //to remove, not used anymore
248   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
249   {
250     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
251   }
252
253  void CService::finalizeSignal(void)
254  {
255    finalizeSignal_=true ;
256    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
257  }
258
259  shared_ptr<CEventScheduler> CService::getEventScheduler(void)
260  {
261    return eventScheduler_ ;
262  }
263}
Note: See TracBrowser for help on using the repository browser.