source: XIOS2/dev/dev_ym/XIOS_COUPLING/src/manager/services.cpp @ 2360

Last change on this file since 2360 was 2287, checked in by jderouillat, 2 years ago

Fix for second level servers usage (nomenclature and recomment MPI_Win_free)

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.6 KB
Line 
1#include "services.hpp"
2#include "services_manager.hpp"
3#include "mpi.hpp"
4#include "cxios.hpp"
5#include "server_context.hpp"
6#include "event_scheduler.hpp"
7#include "timer.hpp"
8
9namespace xios
10{
11  CService::CService(MPI_Comm serviceComm, const std::string& poolId, const std::string& serviceId, const int& partitionId, 
12                     int type, int nbPartitions) : finalizeSignal_(false), eventScheduler_(nullptr), poolId_(poolId), serviceId_(serviceId),
13                                                   partitionId_(partitionId), type_(type), nbPartitions_(nbPartitions), hasNotification_(false)
14
15
16  {
17    info(40)<<"CService::CService  : new service created ; serviceId : "<<serviceId<<endl ;
18   
19    int localRank, globalRank, commSize ;
20
21    MPI_Comm_dup(serviceComm, &serviceComm_) ;
22    MPI_Comm globalComm_=CXios::getXiosComm() ;
23 
24    MPI_Comm_rank(globalComm_,&globalRank) ;
25    MPI_Comm_rank(serviceComm_,&localRank) ;
26   
27    winNotify_ = new CWindowManager(serviceComm_, maxBufferSize_) ;
28    winNotify_->lockWindow(localRank,0) ;
29    winNotify_->updateToWindow(localRank, this, &CService::createContextDumpOut) ;
30    winNotify_->unlockWindow(localRank,0) ;
31    MPI_Barrier(serviceComm_) ;
32    if (localRank==localLeader_) 
33    {
34      globalLeader_=globalRank ;
35      MPI_Comm_rank(serviceComm_,&commSize) ;
36      CXios::getServicesManager()->registerService(poolId, serviceId, partitionId, type, commSize, nbPartitions, globalLeader_) ;
37    }
38    eventScheduler_ = new CEventScheduler(serviceComm_) ;
39
40    ostringstream oss;
41    oss<<partitionId;
42    name_= poolId+"__"+serviceId+"_"+oss.str();
43  }
44
45  CService::~CService()
46  {
47    delete eventScheduler_ ;
48    delete winNotify_ ;
49    for(auto& it : contexts_) delete it.second ;
50  }
51
52
53  void CService::createContext( const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
54  {
55    int commSize ;
56    MPI_Comm_size(serviceComm_, &commSize) ;
57    info(40)<<"CService::createContext  : notify CreateContext to all services members ; serviceId : "<<serviceId<<" ; contextId : "<<contextId<<endl ;
58
59    for(int rank=0; rank<commSize; rank++) 
60    {
61      notifyOutType_=NOTIFY_CREATE_CONTEXT ;
62      notifyOutCreateContext_ = make_tuple(poolId, serviceId, partitionId, contextId) ;
63      sendNotification(rank) ;
64    }
65    info(40)<<"CService::createContext  : notify CreateContext to all services members : DONE "<<endl ;
66  }
67/*
68  void CService::createContext(const std::string& contextId)
69  {
70    contexts_[contextId] = new CServerContext(this, serviceComm_, poolId_, serviceId_, partitionId_, contextId) ;
71  }
72*/
73  void CService::createContextNotify(int rank, const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
74  {
75    winNotify_->lockWindow(rank,0) ;
76    winNotify_->updateFromWindow(rank, this, &CService::createContextDumpIn) ;
77    notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
78    winNotify_->updateToWindow(rank, this, &CService::createContextDumpOut) ; 
79    winNotify_->unlockWindow(rank,0) ;   
80  }
81
82
83  void CService::createContextDumpOut(CBufferOut& buffer)
84  {
85    buffer.realloc(maxBufferSize_) ;
86   
87    buffer << (int) (notifications_.size());
88   
89    for(auto it=notifications_.begin();it!=notifications_.end(); ++it) 
90      buffer << std::get<0>(*it) << std::get<1>(*it) << std::get<2>(*it) << std::get<3>(*it)  ;
91  }
92
93
94  void CService::createContextDumpIn(CBufferIn& buffer)
95  {
96    std::string poolId ;
97    std::string serviceId ;
98    int partitionId ;
99    std::string contextId ;
100   
101    notifications_.clear() ;
102    int nbNotifications ;
103    buffer>>nbNotifications ;
104    for(int i=0;i<nbNotifications;i++) 
105    {
106      buffer>>poolId>>serviceId>>partitionId>>contextId ;
107      notifications_.push_back(std::make_tuple(poolId, serviceId, partitionId, contextId)) ;
108    }
109  }
110
111  bool CService::eventLoop(bool serviceOnly)
112  {
113    //checkCreateContextNotification() ;
114    CTimer::get("CService::eventLoop").resume();
115    int flag ;
116    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
117   
118//    double time=MPI_Wtime() ;
119//    if (time-lastEventLoop_ > eventLoopLatency_)
120//    {
121      checkNotifications() ;
122//      lastEventLoop_=time ;
123//    }
124
125
126    eventScheduler_->checkEvent() ;
127    for(auto it=contexts_.begin();it!=contexts_.end();++it) 
128    {
129      if (it->second->eventLoop(serviceOnly))
130      {
131        delete it->second ; 
132        contexts_.erase(it) ;
133        // destroy server_context -> to do later
134        break ;
135      } ;
136    }
137    CTimer::get("CService::eventLoop").suspend();
138    if (contexts_.empty() && finalizeSignal_) return true ;
139    else return false ;
140  }
141
142  void CService::sendNotification(int rank)
143  {
144    winNotify_->lockWindowExclusive(rank) ;
145    winNotify_->pushToLockedWindow(rank, this, &CService::notificationsDumpOut) ;
146    winNotify_->unlockWindow(rank) ;
147  }
148
149 
150  void CService::notificationsDumpOut(CBufferOut& buffer)
151  {
152   
153    buffer.realloc(maxBufferSize_) ;
154   
155    if (notifyOutType_==NOTIFY_CREATE_CONTEXT)
156    {
157      auto& arg=notifyOutCreateContext_ ;
158      buffer << notifyOutType_ << std::get<0>(arg)<<std::get<1>(arg) << std::get<2>(arg)<<std::get<3>(arg) ;
159    }
160  }
161
162  void CService::notificationsDumpIn(CBufferIn& buffer)
163  {
164    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
165    else
166    {
167      buffer>>notifyInType_;
168      if (notifyInType_==NOTIFY_CREATE_CONTEXT)
169      {
170        auto& arg=notifyInCreateContext_ ;
171        buffer >> std::get<0>(arg)>> std::get<1>(arg) >> std::get<2>(arg)>> std::get<3>(arg);
172      }
173    }
174  }
175
176
177
178
179  void CService::checkNotifications(void)
180  {
181    if (!hasNotification_)
182    {
183      double time=MPI_Wtime() ;
184      if (time-lastEventLoop_ > eventLoopLatency_) 
185      {
186        int commRank ;
187        MPI_Comm_rank(serviceComm_, &commRank) ;
188        winNotify_->lockWindowExclusive(commRank) ;
189        winNotify_->popFromLockedWindow(commRank, this, &CService::notificationsDumpIn) ;
190        winNotify_->unlockWindow(commRank) ;
191     
192        if (notifyInType_!= NOTIFY_NOTHING)
193        {
194          hasNotification_=true ;
195          std::hash<string> hashString ;
196          size_t hashId = hashString(name_) ;
197          size_t currentTimeLine=0 ;
198          info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler"<<endl ;
199          eventScheduler_->registerEvent(currentTimeLine,hashId); 
200        }
201        lastEventLoop_=time ;
202      }
203    }
204   
205    if (hasNotification_)
206    {
207      std::hash<string> hashString ;
208      size_t hashId = hashString(name_) ;
209      size_t currentTimeLine=0 ;
210      info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : eventIsReceived ?"<<endl ;
211      if (eventScheduler_->queryEvent(currentTimeLine,hashId))
212      {
213        eventScheduler_->popEvent() ;
214        info(40)<<"CService::checkNotifications(void) : receive notification => event scheduler : RECEIVED"<<endl ;
215        if (notifyInType_==NOTIFY_CREATE_CONTEXT) createContext() ;
216        hasNotification_=false ;
217      }
218    }
219  }
220
221
222
223//ym not use any more
224  void CService::checkCreateContextNotification(void)
225  {
226    int commRank ;
227    MPI_Comm_rank(serviceComm_, &commRank) ;
228    winNotify_->lockWindow(commRank,0) ;
229    winNotify_->updateFromWindow(commRank, this, &CService::createContextDumpIn) ;
230   
231    if (!notifications_.empty())
232    {
233      auto info = notifications_.front() ;
234      createNewContext(get<0>(info), get<1>(info), get<2>(info), get<3>(info)) ;
235      notifications_.pop_front() ;
236      winNotify_->updateToWindow(commRank, this, &CService::createContextDumpOut) ;     
237    }
238    winNotify_->unlockWindow(commRank,0) ;
239  }
240
241  void CService::createContext(void)
242   {
243     info(40)<<"CService::createContext(void)  : receive createContext notification"<<endl ;
244     auto& arg=notifyInCreateContext_ ;
245     string poolId = get<0>(arg) ;
246     string serviceId = get<1>(arg) ;
247     int partitionId = get<2>(arg) ;
248     string contextId = get<3>(arg) ;
249     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ;
250   }
251
252   //to remove, not used anymore
253   void CService::createNewContext(const std::string& poolId, const std::string& serviceId, const int& partitionId, const std::string& contextId)
254   {
255     contexts_[contextId] = new CServerContext(this, serviceComm_, poolId, serviceId, partitionId, contextId) ; 
256   }
257
258  void CService::finalizeSignal(void)
259  {
260    finalizeSignal_=true ;
261    for(auto it=contexts_.begin();it!=contexts_.end();++it) it->second->finalizeSignal() ;
262  }
263
264  CEventScheduler* CService::getEventScheduler(void)
265  {
266    return eventScheduler_ ;
267  }
268}
Note: See TracBrowser for help on using the repository browser.