source: XIOS/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp @ 2258

Last change on this file since 2258 was 2258, checked in by ymipsl, 3 years ago

One sided protocol improvment.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.3 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "timer.hpp"
8
9namespace xios
10{
11  CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
12                     int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
13                                                   partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
14
15
16  {
17    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
18   
19    int localRank, globalRank, commSize ;
20
21    MPI_Comm_dup(serviceComm, &serviceComm_) ;
22    MPI_Comm globalComm_=CXios::getXiosComm() ;
23 
24    MPI_Comm_rank(globalComm_,&globalRank) ;
25    MPI_Comm_rank(serviceComm_,&localRank) ;
26   
27    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_) ;
28    winNotify_->lockWindow(localRank,0) ;
29    winNotify_->updateToWindow(localRank, this, &CService::createContextDumpOut) ;
30    winNotify_->unlockWindow(localRank,0) ;
31    MPI_Barrier(serviceComm_) ;
32    if (localRank==localLeader_) 
33    {
34      globalLeader_=globalRank ;
35      MPI_Comm_rank(serviceComm_,&commSize) ;
36      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
37    }
38    eventScheduler_ = new CEventScheduler(serviceComm_) ;
39
40    ostringstream oss;
41    oss<<partitionId;
42    name_= poolId+"::"+serviceId+"_"+oss.str();
43  }
44
45  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
46  {
47    int commSize ;
48    MPI_Comm_size(serviceComm_, &commSize) ;
49    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
50
51    for(int rank=0; rank<commSize; rank++) 
52    {
53      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
54      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
55      sendNotification(rank) ;
56    }
57    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
58  }
59/*
60  void CService::createContext(const std::string& contextId)
61  {
62    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
63  }
64*/
65  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
66  {
67    winNotify_->lockWindow(rank,0) ;
68    winNotify_->updateFromWindow(rank, this, &CService::createContextDumpIn) ;
69    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
70    winNotify_->updateToWindow(rank, this, &CService::createContextDumpOut) ; 
71    winNotify_->unlockWindow(rank,0) ;   
72  }
73
74
75  void CService::createContextDumpOut(CBufferOut& buffer)
76  {
77    buffer.realloc(maxBufferSize_) ;
78   
79    buffer << (int) (notifications_.size());
80   
81    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
82      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
83  }
84
85
86  void CService::createContextDumpIn(CBufferIn& buffer)
87  {
88    std::string poolId ;
89    std::string serviceId ;
90    int partitionId ;
91    std::string contextId ;
92   
93    notifications_.clear() ;
94    int nbNotifications ;
95    buffer>>nbNotifications ;
96    for(int i=0;i<nbNotifications;i++) 
97    {
98      buffer>>poolId>>serviceId>>partitionId>>contextId ;
99      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
100    }
101  }
102
103  bool CService::eventLoop(bool serviceOnly)
104  {
105    //checkCreateContextNotification() ;
106    CTimer::get("CService::eventLoop").resume();
107   
108//    double time=MPI_Wtime() ;
109//    if (time-lastEventLoop_ > eventLoopLatency_)
110//    {
111      checkNotifications() ;
112//      lastEventLoop_=time ;
113//    }
114
115
116    eventScheduler_->checkEvent() ;
117    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
118    {
119      if (it->second->eventLoop(serviceOnly))
120      {
121        contexts_.erase(it) ;
122        // destroy server_context -> to do later
123        break ;
124      } ;
125    }
126    CTimer::get("CService::eventLoop").suspend();
127    if (contexts_.empty() && finalizeSignal_) return true ;
128    else return false ;
129  }
130
131  void CService::sendNotification(int rank)
132  {
133    winNotify_->lockWindowExclusive(rank) ;
134    winNotify_->pushToLockedWindow(rank, this, &CService::notificationsDumpOut) ;
135    winNotify_->unlockWindow(rank) ;
136  }
137
138 
139  void CService::notificationsDumpOut(CBufferOut& buffer)
140  {
141   
142    buffer.realloc(maxBufferSize_) ;
143   
144    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
145    {
146      auto& arg=notifyOutCreateContext_ ;
147      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
148    }
149  }
150
151  void CService::notificationsDumpIn(CBufferIn& buffer)
152  {
153    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
154    else
155    {
156      buffer>>notifyInType_;
157      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
158      {
159        auto& arg=notifyInCreateContext_ ;
160        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
161      }
162    }
163  }
164
165
166
167
168  void CService::checkNotifications(void)
169  {
170    if (!hasNotification_)
171    {
172      double time=MPI_Wtime() ;
173      if (time-lastEventLoop_ > eventLoopLatency_) 
174      {
175        int commRank ;
176        MPI_Comm_rank(serviceComm_, &commRank) ;
177        winNotify_->lockWindowExclusive(commRank) ;
178        winNotify_->popFromLockedWindow(commRank, this, &CService::notificationsDumpIn) ;
179        winNotify_->unlockWindow(commRank) ;
180     
181        if (notifyInType_!= NOTIFY_NOTHING)
182        {
183          hasNotification_=true ;
184          std::hash<string> hashString ;
185          size_t hashId = hashString(name_) ;
186          size_t currentTimeLine=0 ;
187          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ;
188          eventScheduler_->registerEvent(currentTimeLine,hashId); 
189        }
190        lastEventLoop_=time ;
191      }
192    }
193   
194    if (hasNotification_)
195    {
196      std::hash<string> hashString ;
197      size_t hashId = hashString(name_) ;
198      size_t currentTimeLine=0 ;
199      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
200      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
201      {
202        eventScheduler_->popEvent() ;
203        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
204        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
205        hasNotification_=false ;
206      }
207    }
208  }
209
210
211
212//ym not use any more
213  void CService::checkCreateContextNotification(void)
214  {
215    int commRank ;
216    MPI_Comm_rank(serviceComm_, &commRank) ;
217    winNotify_->lockWindow(commRank,0) ;
218    winNotify_->updateFromWindow(commRank, this, &CService::createContextDumpIn) ;
219   
220    if (!notifications_.empty())
221    {
222      auto info = notifications_.front() ;
223      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
224      notifications_.pop_front() ;
225      winNotify_->updateToWindow(commRank, this, &CService::createContextDumpOut) ;     
226    }
227    winNotify_->unlockWindow(commRank,0) ;
228  }
229
230  void CService::createContext(void)
231   {
232     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
233     auto& arg=notifyInCreateContext_ ;
234     string poolId = get<0>(arg) ;
235     string& serviceId = get<1>(arg) ;
236     int partitionId = get<2>(arg) ;
237     string contextId = get<3>(arg) ;
238     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
239   }
240
241   //to remove, not used anymore
242   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
243   {
244     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
245   }
246
247  void CService::finalizeSignal(void)
248  {
249    finalizeSignal_=true ;
250    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
251  }
252
253  CEventScheduler* CService::getEventScheduler(void)
254  {
255    return eventScheduler_ ;
256  }
257}
Note: See TracBrowser for help on using the repository browser.