source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2404

Last change on this file since 2404 was 2404, checked in by ymipsl, 21 months ago

Add the possibility to launch a service on same ressource than an other.
YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 11.5 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9
10namespace xios
11{
12  CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id) : Id_(Id), finalizeSignal_(false)
13  {
14    int commRank, commSize ;
15    MPI_Comm_dup(poolComm, &poolComm_) ;
16    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_) ;
17    MPI_Comm_rank(poolComm, &commRank) ;
18    MPI_Comm_size(poolComm, &commSize) ;
19    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
20    if (commRank==localLeader_)
21    {
22      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
23      int globalLeaderRank ;
24      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
25      CXios::getRessourcesManager()->registerPool(Id, commSize, globalLeaderRank) ;
26    }
27   
28    winNotify_->lockWindow(commRank,0) ;
29    winNotify_->updateToWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
30    winNotify_->unlockWindow(commRank,0) ;       
31    MPI_Barrier(poolComm_) ;
32  }
33
34  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
35  {
36    // for now suppose nbPartitions=1
37   
38    auto it=occupancy_.begin() ;
39    int commSize ;
40    MPI_Comm_size(poolComm_, &commSize) ;
41    vector<bool> procs_in(commSize,false) ;
42    vector<pair<int,int>> procs_update ;
43
44    for(int i=0; i<size; i++) 
45    {
46      procs_in[it->second]=true ;
47      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
48      ++it ;
49    }
50   
51    occupancy_.erase(occupancy_.begin(),it) ;
52    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
53   
54    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
55    for(int rank=0; rank<commSize; rank++)
56    {
57      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
58      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
59    }
60  }
61
62  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
63  {
64    // for now suppose nbPartitions=1
65   
66    auto it=occupancy_.begin() ;
67    int commSize ;
68    MPI_Comm_size(poolComm_, &commSize) ;
69   
70    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
71            <<"  onto service Id  :"<< serviceId<<endl ;
72    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
73  }
74
75/* 
76  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
77                                           bool in)
78  {
79    winNotify_->lockWindow(rank,0) ;
80    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
81    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
82    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
83    winNotify_->unlockWindow(rank,0) ;   
84  }
85*/
86 
87  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
88  {
89    notifyType_=NOTIFY_CREATE_SERVICE ;
90    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
91    sendNotification(rank) ;
92  }
93
94
95  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
96  {
97    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
98    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
99    sendNotification(rank) ;
100  }
101
102
103  void CPoolRessource::sendNotification(int rank)
104  {
105    winNotify_->lockWindowExclusive(rank) ;
106    winNotify_->pushToLockedWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
107    winNotify_->unlockWindow(rank) ;
108  }
109
110  void CPoolRessource::checkNotifications(void)
111  {
112    int commRank ;
113    MPI_Comm_rank(poolComm_, &commRank) ;
114    winNotify_->lockWindowExclusive(commRank) ;
115    winNotify_->popFromLockedWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
116    winNotify_->unlockWindow(commRank) ;
117    if (notifyType_==NOTIFY_CREATE_SERVICE) createService() ;
118    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) createServiceOnto() ;
119  }
120
121
122  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
123  {
124
125    buffer.realloc(maxBufferSize_) ;
126   
127    if (notifyType_==NOTIFY_CREATE_SERVICE)
128    {
129      auto& arg=notifyCreateService_ ;
130      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
131    }
132    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
133    {
134      auto& arg=notifyCreateServiceOnto_ ;
135      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
136    }
137  }
138
139  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
140  {
141    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
142    else
143    {
144      buffer>>notifyType_;
145      if (notifyType_==NOTIFY_CREATE_SERVICE)
146      {
147        auto& arg=notifyCreateService_ ;
148        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
149      }
150      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
151      {
152        auto& arg=notifyCreateServiceOnto_ ;
153        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
154      }
155    }
156  } 
157
158  void CPoolRessource::createService(void)
159  {
160    auto& arg = notifyCreateService_ ;
161    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
162  }
163 
164  void CPoolRessource::createServiceOnto(void)
165  {
166    auto& arg = notifyCreateServiceOnto_ ;
167    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
168  }
169
170/*
171  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
172  {
173    buffer.realloc(maxBufferSize_) ;
174   
175    buffer << (int) (notifications_.size());
176   
177    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
178      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
179  }
180
181*/
182
183/*
184  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
185  {
186    std::string serviceId ;
187    int type ;
188    int size;
189    int nbPartitions;
190    bool in ;
191
192    notifications_.clear() ;
193    int nbNotifications ;
194    buffer>>nbNotifications ;
195    for(int i=0;i<nbNotifications;i++)
196    {
197      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
198      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
199    }
200  }
201*/
202
203  bool CPoolRessource::eventLoop(bool serviceOnly)
204  {
205    CTimer::get("CPoolRessource::eventLoop").resume();
206   
207    double time=MPI_Wtime() ;
208    int flag ;
209    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
210    if (time-lastEventLoop_ > eventLoopLatency_) 
211    {
212      //checkCreateServiceNotification() ;
213      checkNotifications() ;
214      lastEventLoop_=time ;
215    }
216   
217    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
218    {
219      if (it->second->eventLoop(serviceOnly))
220      {
221        delete it->second ;
222        services_.erase(it) ;
223        // don't forget to free service later
224        break ;
225      }
226    }
227    CTimer::get("CPoolRessource::eventLoop").suspend();
228    if (services_.empty() && finalizeSignal_) return true ;
229    else return false ;
230  }
231/*
232  void CPoolRessource::checkCreateServiceNotification(void)
233  {
234    int commRank ;
235    MPI_Comm_rank(poolComm_, &commRank) ;
236    winNotify_->lockWindow(commRank,0) ;
237    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
238   
239    if (!notifications_.empty())
240    {
241      auto info = notifications_.front() ;
242      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
243      notifications_.pop_front() ;
244      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
245    }
246    winNotify_->unlockWindow(commRank,0) ;
247
248  }
249*/
250
251  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
252  {
253     
254     info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
255     MPI_Comm serviceComm, newServiceComm ;
256     int commRank ;
257     MPI_Comm_rank(poolComm_,&commRank) ;
258     MPI_Comm_split(poolComm_, in, commRank, &serviceComm) ;
259     if (in)
260     {
261       int serviceCommSize ;
262       int serviceCommRank ;
263       MPI_Comm_size(serviceComm,&serviceCommSize) ;
264       MPI_Comm_rank(serviceComm,&serviceCommRank) ;
265
266       info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
267                            <<" on rank pool "<<commRank<<endl ;
268       
269       int partitionId ; 
270       if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
271       {
272         int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
273         partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
274       }
275       else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
276
277       MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
278       
279       MPI_Comm_size(newServiceComm,&serviceCommSize) ;
280       MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
281       info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
282               << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
283     
284       services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
285       
286       MPI_Comm_free(&newServiceComm) ;
287     }
288     MPI_Comm_free(&serviceComm) ;
289  }
290 
291  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
292  {
293     
294    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
295            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
296    for(auto& service : services_) 
297    {
298      if (std::get<0>(service.first)==onServiceId)
299      {
300        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
301        MPI_Comm newServiceComm ;
302        MPI_Comm_dup(serviceComm, &newServiceComm) ;
303        int nbPartitions = service.second->getNbPartitions() ;
304        int partitionId = service.second->getPartitionId() ;
305        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
306        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
307        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, Id_, serviceId, partitionId, type,
308                                                                         nbPartitions, eventScheduler) ;       
309      }
310    }
311   
312  }
313
314  void CPoolRessource::createService(MPI_Comm serviceComm, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients & attached
315  {
316    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, Id_, serviceId, partitionId, type, nbPartitions) ;
317  }
318
319
320  void CPoolRessource::finalizeSignal(void)
321  {
322    finalizeSignal_=true ;
323    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
324  } 
325 
326  CPoolRessource::~CPoolRessource()
327  {
328    delete winNotify_ ;
329    for(auto& service : services_) delete service.second ;
330  }
331}
Note: See TracBrowser for help on using the repository browser.