source: XIOS3/trunk/src/manager/services.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 6 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 10.1 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "thread_manager.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  extern CLogType logTimers ;
13
14  CService::CService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
15                     int type, int nbPartitions) 
16                         : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
17                           partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
18
19
20  {
21    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
22   
23    int localRank, globalRank, commSize ;
24
25    xios::MPI_Comm_dup(serviceComm, &serviceComm_) ;
26    CXios::getMpiGarbageCollector().registerCommunicator(serviceComm_) ;
27    MPI_Comm globalComm_=CXios::getXiosComm() ;
28 
29    MPI_Comm_rank(globalComm_,&globalRank) ;
30    MPI_Comm_rank(serviceComm_,&localRank) ;
31   
32    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_,"CService::winNotify_") ;
33    winNotify_->updateToExclusiveWindow(localRank, this, &CService::createContextDumpOut) ;
34    MPI_Barrier(serviceComm_) ;
35    if (localRank==localLeader_) 
36    {
37      globalLeader_=globalRank ;
38      MPI_Comm_rank(serviceComm_,&commSize) ;
39      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
40    }
41    if (eventScheduler) eventScheduler_ = eventScheduler ;
42    else eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ;
43
44    ostringstream oss;
45    oss<<partitionId;
46    name_= poolId+"__"+serviceId+"_"+oss.str();
47   
48    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CService::threadEventLoop, this) ;
49  }
50
51  CService::~CService()
52  {
53    delete winNotify_ ;
54    for(auto& it : contexts_) delete it.second ;
55  }
56
57
58  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
59  {
60    int commSize ;
61    MPI_Comm_size(serviceComm_, &commSize) ;
62    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
63
64    for(int rank=0; rank<commSize; rank++) 
65    {
66      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
67      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
68      sendNotification(rank) ;
69    }
70    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
71  }
72/*
73  void CService::createContext(const std::string& contextId)
74  {
75    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
76  }
77*/
78  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
79  {
80    winNotify_->lockWindowExclusive(rank) ;
81    winNotify_->updateFromLockedWindow(rank, this, &CService::createContextDumpIn) ;
82    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
83    winNotify_->updateToLockedWindow(rank, this, &CService::createContextDumpOut) ; 
84    winNotify_->unlockWindowExclusive(rank) ;   
85  }
86
87
88  void CService::createContextDumpOut(CBufferOut& buffer)
89  {
90    buffer.realloc(maxBufferSize_) ;
91   
92    buffer << (int) (notifications_.size());
93   
94    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
95      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
96  }
97
98
99  void CService::createContextDumpIn(CBufferIn& buffer)
100  {
101    std::string poolId ;
102    std::string serviceId ;
103    int partitionId ;
104    std::string contextId ;
105   
106    notifications_.clear() ;
107    int nbNotifications ;
108    buffer>>nbNotifications ;
109    for(int i=0;i<nbNotifications;i++) 
110    {
111      buffer>>poolId>>serviceId>>partitionId>>contextId ;
112      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
113    }
114  }
115
116  bool CService::eventLoop(bool serviceOnly)
117  {
118    //checkCreateContextNotification() ;
119    if (info.isActive(logTimers)) CTimer::get("CService::eventLoop").resume();
120    int flag ;
121    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
122   
123//    double time=MPI_Wtime() ;
124//    if (time-lastEventLoop_ > eventLoopLatency_)
125//    {
126      checkNotifications() ;
127//      lastEventLoop_=time ;
128//    }
129
130
131    eventScheduler_->checkEvent() ;
132   
133    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
134    {
135      if (it->second->eventLoop(serviceOnly))
136      {
137        delete it->second ; 
138        contexts_.erase(it) ;
139        // destroy server_context -> to do later
140        break ;
141      } ;
142    }
143 
144    if (info.isActive(logTimers)) CTimer::get("CService::eventLoop").suspend();
145    if (contexts_.empty() && finalizeSignal_) return true ;
146    else return false ;
147  }
148
149  void CService::threadEventLoop(void)
150  {
151    if (info.isActive(logTimers)) CTimer::get("CService::eventLoop").resume();
152    info(100)<<"Launch Thread for  CService::threadEventLoop, service id = "<<name_<<endl ;
153    CThreadManager::threadInitialize() ; 
154   
155    do
156    {
157      int flag ;
158      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
159   
160//    double time=MPI_Wtime() ;
161//    if (time-lastEventLoop_ > eventLoopLatency_)
162//    {
163      checkNotifications() ;
164//      lastEventLoop_=time ;
165//    }
166
167
168      eventScheduler_->checkEvent() ;
169   
170      for(auto it=contexts_.begin();it!=contexts_.end();++it) 
171      {
172        if (it->second->isFinished())
173        {
174          delete it->second ; 
175          contexts_.erase(it) ;
176          // destroy server_context -> to do later
177          break ;
178        } ;
179      }
180
181      if (contexts_.empty() && finalizeSignal_) finished_=true ;
182      if (!finished_) CThreadManager::yield() ;
183    } while (!finished_) ;
184   
185    CThreadManager::threadFinalize() ;
186    info(100)<<"Close thread for  CService::threadEventLoop, service id = "<<name_<<endl ;
187    if (info.isActive(logTimers)) CTimer::get("CService::eventLoop").suspend();
188  }
189
190
191  void CService::sendNotification(int rank)
192  {
193    winNotify_->pushToExclusiveWindow(rank, this, &CService::notificationsDumpOut) ;
194  }
195
196 
197  void CService::notificationsDumpOut(CBufferOut& buffer)
198  {
199   
200    buffer.realloc(maxBufferSize_) ;
201   
202    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
203    {
204      auto& arg=notifyOutCreateContext_ ;
205      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
206    }
207  }
208
209  void CService::notificationsDumpIn(CBufferIn& buffer)
210  {
211    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
212    else
213    {
214      buffer>>notifyInType_;
215      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
216      {
217        auto& arg=notifyInCreateContext_ ;
218        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
219      }
220    }
221  }
222
223
224
225
226  void CService::checkNotifications(void)
227  {
228    if (!hasNotification_)
229    {
230      double time=MPI_Wtime() ;
231      if (time-lastEventLoop_ > eventLoopLatency_) 
232      {
233        int commRank ;
234        MPI_Comm_rank(serviceComm_, &commRank) ;
235        winNotify_->popFromExclusiveWindow(commRank, this, &CService::notificationsDumpIn) ;
236       
237        if (notifyInType_!= NOTIFY_NOTHING)
238        {
239          hasNotification_=true ;
240          std::hash<string> hashString ;
241          size_t hashId = hashString(name_) ;
242          size_t currentTimeLine=0 ;
243          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : timeLine : "<<currentTimeLine<<"  hashId : "<<hashId<<endl ;
244          eventScheduler_->registerEvent(currentTimeLine,hashId); 
245        }
246        lastEventLoop_=time ;
247      }
248    }
249   
250    if (hasNotification_)
251    {
252      std::hash<string> hashString ;
253      size_t hashId = hashString(name_) ;
254      size_t currentTimeLine=0 ;
255//      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
256      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
257      {
258        eventScheduler_->popEvent() ;
259        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
260        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
261        hasNotification_=false ;
262      }
263    }
264  }
265
266
267
268//ym not use any more
269  void CService::checkCreateContextNotification(void)
270  {
271    int commRank ;
272    MPI_Comm_rank(serviceComm_, &commRank) ;
273    winNotify_->lockWindowExclusive(commRank) ;
274    winNotify_->updateFromLockedWindow(commRank, this, &CService::createContextDumpIn) ;
275   
276    if (!notifications_.empty())
277    {
278      auto info = notifications_.front() ;
279      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
280      notifications_.pop_front() ;
281      winNotify_->updateToLockedWindow(commRank, this, &CService::createContextDumpOut) ;     
282    }
283    winNotify_->unlockWindowExclusive(commRank) ;
284  }
285
286  void CService::createContext(void)
287   {
288     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
289     auto& arg=notifyInCreateContext_ ;
290     string poolId = get<0>(arg) ;
291     string serviceId = get<1>(arg) ;
292     int partitionId = get<2>(arg) ;
293     string contextId = get<3>(arg) ;
294     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ;
295   }
296
297   //to remove, not used anymore
298   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
299   {
300     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
301   }
302
303  void CService::finalizeSignal(void)
304  {
305    finalizeSignal_=true ;
306    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
307  }
308
309  shared_ptr<CEventScheduler> CService::getEventScheduler(void)
310  {
311    return eventScheduler_ ;
312  }
313}
Note: See TracBrowser for help on using the repository browser.