source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2523

Last change on this file since 2523 was 2523, checked in by ymipsl, 12 months ago

Adaptation to new hyper event scheduler.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 13.2 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9#include "event_scheduler.hpp"
10
11namespace xios
12{
13  CPoolRessource::CPoolRessource(MPI_Comm poolComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false)
14  {
15    int commRank, commSize ;
16    MPI_Comm_dup(poolComm, &poolComm_) ;
17    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_) ;
18    MPI_Comm_rank(poolComm, &commRank) ;
19    MPI_Comm_size(poolComm, &commSize) ;
20    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
21    if (commRank==localLeader_)
22    {
23      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
24      int globalLeaderRank ;
25      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
26      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ;
27    }
28   
29    notifyType_=NOTIFY_NOTHING;
30    winNotify_->updateToExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
31    MPI_Barrier(poolComm_) ;
32    if (eventScheduler) eventScheduler_=eventScheduler ;
33    else eventScheduler_= make_shared<CEventScheduler>(poolComm) ;
34    freeRessourceEventScheduler_ = eventScheduler_ ;
35  }
36
37  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
38  {
39    // for now suppose nbPartitions=1
40   
41    auto it=occupancy_.begin() ;
42
43    // ym obsolete, service cannot overlap, only created on separate ressource or matching excatly existing service
44    // occupancy management must not be used anymore => simplification
45    // for now raise a message error when no ressources are availables
46   
47    int commSize ;
48    MPI_Comm_size(poolComm_, &commSize) ;
49    vector<bool> procs_in(commSize,false) ;
50    vector<pair<int,int>> procs_update ;
51
52    for(int i=0; i<size; i++) 
53    {
54      if (it->first != 0) ERROR("void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)",
55                                 << "No enough free ressources on pool id="<<getId()<<" to launch service id="<<serviceId);
56      procs_in[it->second]=true ;
57      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
58      ++it ;
59    }
60
61
62    occupancy_.erase(occupancy_.begin(),it) ;
63    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
64   
65    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
66    for(int rank=0; rank<commSize; rank++)
67    {
68      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
69      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
70    }
71  }
72
73  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
74  {
75    // for now suppose nbPartitions=1
76   
77    auto it=occupancy_.begin() ;
78    int commSize ;
79    MPI_Comm_size(poolComm_, &commSize) ;
80   
81    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
82            <<"  onto service Id  :"<< serviceId<<endl ;
83    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
84  }
85
86/* 
87  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
88                                           bool in)
89  {
90    winNotify_->lockWindow(rank,0) ;
91    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
92    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
93    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
94    winNotify_->unlockWindow(rank,0) ;   
95  }
96*/
97 
98  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
99  {
100    notifyType_=NOTIFY_CREATE_SERVICE ;
101    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
102    sendNotification(rank) ;
103  }
104
105
106  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
107  {
108    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
109    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
110    sendNotification(rank) ;
111  }
112
113
114  void CPoolRessource::sendNotification(int rank)
115  {
116    winNotify_->pushToExclusiveWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
117  }
118
119  void CPoolRessource::checkNotifications(void)
120  {
121    int commRank ;
122    MPI_Comm_rank(poolComm_, &commRank) ;
123    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
124    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
125    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
126  }
127
128
129  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
130  {
131
132    buffer.realloc(maxBufferSize_) ;
133   
134    if (notifyType_==NOTIFY_CREATE_SERVICE)
135    {
136      auto& arg=notifyCreateService_ ;
137      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
138    }
139    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
140    {
141      auto& arg=notifyCreateServiceOnto_ ;
142      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
143    }
144  }
145
146  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
147  {
148    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
149    else
150    {
151      buffer>>notifyType_;
152      if (notifyType_==NOTIFY_CREATE_SERVICE)
153      {
154        auto& arg=notifyCreateService_ ;
155        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
156      }
157      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
158      {
159        auto& arg=notifyCreateServiceOnto_ ;
160        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
161      }
162    }
163  } 
164
165  void CPoolRessource::createService(void)
166  {
167    auto& arg = notifyCreateService_ ;
168    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
169  }
170 
171  void CPoolRessource::createServiceOnto(void)
172  {
173    auto& arg = notifyCreateServiceOnto_ ;
174    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
175  }
176
177/*
178  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
179  {
180    buffer.realloc(maxBufferSize_) ;
181   
182    buffer << (int) (notifications_.size());
183   
184    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
185      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
186  }
187
188*/
189
190/*
191  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
192  {
193    std::string serviceId ;
194    int type ;
195    int size;
196    int nbPartitions;
197    bool in ;
198
199    notifications_.clear() ;
200    int nbNotifications ;
201    buffer>>nbNotifications ;
202    for(int i=0;i<nbNotifications;i++)
203    {
204      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
205      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
206    }
207  }
208*/
209
210  bool CPoolRessource::eventLoop(bool serviceOnly)
211  {
212    CTimer::get("CPoolRessource::eventLoop").resume();
213   
214    double time=MPI_Wtime() ;
215    int flag ;
216    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
217    if (time-lastEventLoop_ > eventLoopLatency_) 
218    {
219      //checkCreateServiceNotification() ;
220      checkNotifications() ;
221      lastEventLoop_=time ;
222    }
223   
224    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
225    {
226      if (it->second->eventLoop(serviceOnly))
227      {
228        delete it->second ;
229        services_.erase(it) ;
230        // don't forget to free service later
231        break ;
232      }
233    }
234    CTimer::get("CPoolRessource::eventLoop").suspend();
235    if (services_.empty() && finalizeSignal_) return true ;
236    else return false ;
237  }
238/*
239  void CPoolRessource::checkCreateServiceNotification(void)
240  {
241    int commRank ;
242    MPI_Comm_rank(poolComm_, &commRank) ;
243    winNotify_->lockWindow(commRank,0) ;
244    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
245   
246    if (!notifications_.empty())
247    {
248      auto info = notifications_.front() ;
249      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
250      notifications_.pop_front() ;
251      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
252    }
253    winNotify_->unlockWindow(commRank,0) ;
254
255  }
256*/
257
258  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
259  {
260     
261    info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
262    MPI_Comm serviceComm, newServiceComm, freeComm ;
263    int commRank ;
264     
265    int color;
266    if (!services_.empty()) color = 0 ;
267    else color=1 ; 
268    MPI_Comm_rank(poolComm_,&commRank) ;
269    MPI_Comm_split(poolComm_, color, commRank, &freeComm) ;  // workaround
270   
271    if (services_.empty()) 
272    {
273      MPI_Comm_rank(freeComm,&commRank) ;
274      MPI_Comm_split(freeComm, in, commRank, &serviceComm) ;
275
276      // temporary for event scheduler, we must using hierarchical split of free ressources communicator.
277      // we hope for now that spliting using occupancy make this match
278
279      if (in)
280      {
281        int serviceCommSize ;
282        int serviceCommRank ;
283        MPI_Comm_size(serviceComm,&serviceCommSize) ;
284        MPI_Comm_rank(serviceComm,&serviceCommRank) ;
285
286        info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
287                              <<" on rank pool "<<commRank<<endl ;
288       
289        int partitionId ; 
290        if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
291        {
292          int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
293          partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
294        }
295        else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
296
297        MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
298
299        MPI_Comm_size(newServiceComm,&serviceCommSize) ;
300        MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
301        info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
302                << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
303     
304        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
305        freeRessourceEventScheduler_->splitScheduler(newServiceComm, parentScheduler, childScheduler) ;
306        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
307        isFirstSplit_=false ;
308
309        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, childScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
310       
311        MPI_Comm_free(&newServiceComm) ;
312      }
313      else
314      {
315        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
316        freeRessourceEventScheduler_->splitScheduler(serviceComm, parentScheduler, childScheduler) ;
317        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
318        freeRessourceEventScheduler_ = childScheduler ;
319        isFirstSplit_=false ;
320      }
321      MPI_Comm_free(&serviceComm) ;
322    }
323    MPI_Comm_free(&freeComm) ;
324  }
325 
326  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
327  {
328     
329    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
330            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
331    for(auto& service : services_) 
332    {
333      if (std::get<0>(service.first)==onServiceId)
334      {
335        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
336        MPI_Comm newServiceComm ;
337        MPI_Comm_dup(serviceComm, &newServiceComm) ;
338        int nbPartitions = service.second->getNbPartitions() ;
339        int partitionId = service.second->getPartitionId() ;
340        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
341        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
342        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, eventScheduler, Id_, serviceId, partitionId, type,
343                                                                         nbPartitions) ;       
344      }
345    }
346   
347  }
348
349  void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached
350  {
351    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
352  }
353
354
355  void CPoolRessource::finalizeSignal(void)
356  {
357    finalizeSignal_=true ;
358    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
359  } 
360 
361  CPoolRessource::~CPoolRessource()
362  {
363    delete winNotify_ ;
364    for(auto& service : services_) delete service.second ;
365  }
366}
Note: See TracBrowser for help on using the repository browser.