source: XIOS3/trunk/src/manager/services.cpp @ 2589

Last change on this file since 2589 was 2589, checked in by jderouillat, 7 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 9.9 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "thread_manager.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  CService::CService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
13                     int type, int nbPartitions) 
14                         : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
15                           partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
16
17
18  {
19    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
20   
21    int localRank, globalRank, commSize ;
22
23    xios::MPI_Comm_dup(serviceComm, &serviceComm_) ;
24    CXios::getMpiGarbageCollector().registerCommunicator(serviceComm_) ;
25    MPI_Comm globalComm_=CXios::getXiosComm() ;
26 
27    MPI_Comm_rank(globalComm_,&globalRank) ;
28    MPI_Comm_rank(serviceComm_,&localRank) ;
29   
30    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_,"CService::winNotify_") ;
31    winNotify_->updateToExclusiveWindow(localRank, this, &CService::createContextDumpOut) ;
32    MPI_Barrier(serviceComm_) ;
33    if (localRank==localLeader_) 
34    {
35      globalLeader_=globalRank ;
36      MPI_Comm_rank(serviceComm_,&commSize) ;
37      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
38    }
39    if (eventScheduler) eventScheduler_ = eventScheduler ;
40    else eventScheduler_ = make_shared<CEventScheduler>(serviceComm_) ;
41
42    ostringstream oss;
43    oss<<partitionId;
44    name_= poolId+"__"+serviceId+"_"+oss.str();
45   
46    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CService::threadEventLoop, this) ;
47  }
48
49  CService::~CService()
50  {
51    delete winNotify_ ;
52    for(auto& it : contexts_) delete it.second ;
53  }
54
55
56  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
57  {
58    int commSize ;
59    MPI_Comm_size(serviceComm_, &commSize) ;
60    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
61
62    for(int rank=0; rank<commSize; rank++) 
63    {
64      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
65      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
66      sendNotification(rank) ;
67    }
68    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
69  }
70/*
71  void CService::createContext(const std::string& contextId)
72  {
73    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
74  }
75*/
76  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
77  {
78    winNotify_->lockWindowExclusive(rank) ;
79    winNotify_->updateFromLockedWindow(rank, this, &CService::createContextDumpIn) ;
80    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
81    winNotify_->updateToLockedWindow(rank, this, &CService::createContextDumpOut) ; 
82    winNotify_->unlockWindowExclusive(rank) ;   
83  }
84
85
86  void CService::createContextDumpOut(CBufferOut& buffer)
87  {
88    buffer.realloc(maxBufferSize_) ;
89   
90    buffer << (int) (notifications_.size());
91   
92    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
93      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
94  }
95
96
97  void CService::createContextDumpIn(CBufferIn& buffer)
98  {
99    std::string poolId ;
100    std::string serviceId ;
101    int partitionId ;
102    std::string contextId ;
103   
104    notifications_.clear() ;
105    int nbNotifications ;
106    buffer>>nbNotifications ;
107    for(int i=0;i<nbNotifications;i++) 
108    {
109      buffer>>poolId>>serviceId>>partitionId>>contextId ;
110      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
111    }
112  }
113
114  bool CService::eventLoop(bool serviceOnly)
115  {
116    //checkCreateContextNotification() ;
117    CTimer::get("CService::eventLoop").resume();
118    int flag ;
119    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
120   
121//    double time=MPI_Wtime() ;
122//    if (time-lastEventLoop_ > eventLoopLatency_)
123//    {
124      checkNotifications() ;
125//      lastEventLoop_=time ;
126//    }
127
128
129    eventScheduler_->checkEvent() ;
130   
131    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
132    {
133      if (it->second->eventLoop(serviceOnly))
134      {
135        delete it->second ; 
136        contexts_.erase(it) ;
137        // destroy server_context -> to do later
138        break ;
139      } ;
140    }
141 
142    CTimer::get("CService::eventLoop").suspend();
143    if (contexts_.empty() && finalizeSignal_) return true ;
144    else return false ;
145  }
146
147  void CService::threadEventLoop(void)
148  {
149    info(100)<<"Launch Thread for  CService::threadEventLoop, service id = "<<name_<<endl ;
150    CThreadManager::threadInitialize() ; 
151   
152    do
153    {
154      CTimer::get("CService::eventLoop").resume();
155      int flag ;
156      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
157   
158//    double time=MPI_Wtime() ;
159//    if (time-lastEventLoop_ > eventLoopLatency_)
160//    {
161      checkNotifications() ;
162//      lastEventLoop_=time ;
163//    }
164
165
166      eventScheduler_->checkEvent() ;
167   
168      for(auto it=contexts_.begin();it!=contexts_.end();++it) 
169      {
170        if (it->second->isFinished())
171        {
172          delete it->second ; 
173          contexts_.erase(it) ;
174          // destroy server_context -> to do later
175          break ;
176        } ;
177      }
178
179      CTimer::get("CService::eventLoop").suspend();
180      if (contexts_.empty() && finalizeSignal_) finished_=true ;
181      if (!finished_) CThreadManager::yield() ;
182    } while (!finished_) ;
183   
184    CThreadManager::threadFinalize() ;
185    info(100)<<"Close thread for  CService::threadEventLoop, service id = "<<name_<<endl ;
186  }
187
188
189  void CService::sendNotification(int rank)
190  {
191    winNotify_->pushToExclusiveWindow(rank, this, &CService::notificationsDumpOut) ;
192  }
193
194 
195  void CService::notificationsDumpOut(CBufferOut& buffer)
196  {
197   
198    buffer.realloc(maxBufferSize_) ;
199   
200    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
201    {
202      auto& arg=notifyOutCreateContext_ ;
203      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
204    }
205  }
206
207  void CService::notificationsDumpIn(CBufferIn& buffer)
208  {
209    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
210    else
211    {
212      buffer>>notifyInType_;
213      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
214      {
215        auto& arg=notifyInCreateContext_ ;
216        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
217      }
218    }
219  }
220
221
222
223
224  void CService::checkNotifications(void)
225  {
226    if (!hasNotification_)
227    {
228      double time=MPI_Wtime() ;
229      if (time-lastEventLoop_ > eventLoopLatency_) 
230      {
231        int commRank ;
232        MPI_Comm_rank(serviceComm_, &commRank) ;
233        winNotify_->popFromExclusiveWindow(commRank, this, &CService::notificationsDumpIn) ;
234       
235        if (notifyInType_!= NOTIFY_NOTHING)
236        {
237          hasNotification_=true ;
238          std::hash<string> hashString ;
239          size_t hashId = hashString(name_) ;
240          size_t currentTimeLine=0 ;
241          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : timeLine : "<<currentTimeLine<<"  hashId : "<<hashId<<endl ;
242          eventScheduler_->registerEvent(currentTimeLine,hashId); 
243        }
244        lastEventLoop_=time ;
245      }
246    }
247   
248    if (hasNotification_)
249    {
250      std::hash<string> hashString ;
251      size_t hashId = hashString(name_) ;
252      size_t currentTimeLine=0 ;
253//      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
254      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
255      {
256        eventScheduler_->popEvent() ;
257        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
258        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
259        hasNotification_=false ;
260      }
261    }
262  }
263
264
265
266//ym not use any more
267  void CService::checkCreateContextNotification(void)
268  {
269    int commRank ;
270    MPI_Comm_rank(serviceComm_, &commRank) ;
271    winNotify_->lockWindowExclusive(commRank) ;
272    winNotify_->updateFromLockedWindow(commRank, this, &CService::createContextDumpIn) ;
273   
274    if (!notifications_.empty())
275    {
276      auto info = notifications_.front() ;
277      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
278      notifications_.pop_front() ;
279      winNotify_->updateToLockedWindow(commRank, this, &CService::createContextDumpOut) ;     
280    }
281    winNotify_->unlockWindowExclusive(commRank) ;
282  }
283
284  void CService::createContext(void)
285   {
286     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
287     auto& arg=notifyInCreateContext_ ;
288     string poolId = get<0>(arg) ;
289     string serviceId = get<1>(arg) ;
290     int partitionId = get<2>(arg) ;
291     string contextId = get<3>(arg) ;
292     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ;
293   }
294
295   //to remove, not used anymore
296   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
297   {
298     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
299   }
300
301  void CService::finalizeSignal(void)
302  {
303    finalizeSignal_=true ;
304    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
305  }
306
307  shared_ptr<CEventScheduler> CService::getEventScheduler(void)
308  {
309    return eventScheduler_ ;
310  }
311}
Note: See TracBrowser for help on using the repository browser.