source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 6 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.4 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9#include "event_scheduler.hpp"
10#include "thread_manager.hpp"
11
12namespace xios
13{
14  extern CLogType logTimers ;
15 
16  CPoolRessource::CPoolRessource(MPI_Comm poolComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false)
17  {
18    int commRank, commSize ;
19    xios::MPI_Comm_dup(poolComm, &poolComm_) ;
20    CXios::getMpiGarbageCollector().registerCommunicator(poolComm_) ;
21    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_,"CPoolRessource::winNotify_") ;
22    MPI_Comm_rank(poolComm, &commRank) ;
23    MPI_Comm_size(poolComm, &commSize) ;
24    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
25    if (commRank==localLeader_)
26    {
27      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
28      int globalLeaderRank ;
29      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
30      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ;
31    }
32   
33    notifyType_=NOTIFY_NOTHING;
34    winNotify_->updateToExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
35    MPI_Barrier(poolComm_) ;
36    if (eventScheduler) eventScheduler_=eventScheduler ;
37    else eventScheduler_= make_shared<CEventScheduler>(poolComm) ;
38    freeRessourceEventScheduler_ = eventScheduler_ ;
39    std::hash<string> hashString ;
40    hashId_ = hashString("CPoolRessource::"+Id) ;
41    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ;
42  }
43
44  void CPoolRessource::synchronize(void)
45  {
46    bool out=false ; 
47    size_t timeLine=0 ;
48         
49    eventScheduler_->registerEvent(timeLine, hashId_) ;
50    while (!out)
51    {
52      CThreadManager::yield() ;
53      out = eventScheduler_->queryEvent(timeLine,hashId_) ;
54      if (out) eventScheduler_->popEvent() ;
55    }
56  }
57
58  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
59  {
60    // for now suppose nbPartitions=1
61   
62    auto it=occupancy_.begin() ;
63
64    // ym obsolete, service cannot overlap, only created on separate ressource or matching excatly existing service
65    // occupancy management must not be used anymore => simplification
66    // for now raise a message error when no ressources are availables
67   
68    int commSize ;
69    MPI_Comm_size(poolComm_, &commSize) ;
70    vector<bool> procs_in(commSize,false) ;
71    vector<pair<int,int>> procs_update ;
72
73    for(int i=0; i<size; i++) 
74    {
75      if (it->first != 0) ERROR("void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)",
76                                 << "No enough free ressources on pool id="<<getId()<<" to launch service id="<<serviceId);
77      procs_in[it->second]=true ;
78      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
79      ++it ;
80    }
81
82
83    occupancy_.erase(occupancy_.begin(),it) ;
84    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
85   
86    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
87    for(int rank=0; rank<commSize; rank++)
88    {
89      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
90      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
91    }
92  }
93
94  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
95  {
96    // for now suppose nbPartitions=1
97   
98    auto it=occupancy_.begin() ;
99    int commSize ;
100    MPI_Comm_size(poolComm_, &commSize) ;
101   
102    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
103            <<"  onto service Id  :"<< serviceId<<endl ;
104    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
105  }
106
107/* 
108  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
109                                           bool in)
110  {
111    winNotify_->lockWindow(rank,0) ;
112    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
113    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
114    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
115    winNotify_->unlockWindow(rank,0) ;   
116  }
117*/
118 
119  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
120  {
121    notifyType_=NOTIFY_CREATE_SERVICE ;
122    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
123    sendNotification(rank) ;
124  }
125
126
127  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
128  {
129    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
130    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
131    sendNotification(rank) ;
132  }
133
134
135  void CPoolRessource::sendNotification(int rank)
136  {
137    winNotify_->pushToExclusiveWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
138  }
139
140  void CPoolRessource::checkNotifications(void)
141  {
142    int commRank ;
143    MPI_Comm_rank(poolComm_, &commRank) ;
144    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
145    if (notifyType_==NOTIFY_CREATE_SERVICE) 
146    {
147      if (CThreadManager::isUsingThreads()) synchronize() ;
148      createService() ;
149    }
150    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
151    {
152      if (CThreadManager::isUsingThreads()) synchronize() ;
153      createServiceOnto() ;
154    }
155  }
156
157
158  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
159  {
160
161    buffer.realloc(maxBufferSize_) ;
162   
163    if (notifyType_==NOTIFY_CREATE_SERVICE)
164    {
165      auto& arg=notifyCreateService_ ;
166      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
167    }
168    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
169    {
170      auto& arg=notifyCreateServiceOnto_ ;
171      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
172    }
173  }
174
175  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
176  {
177    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
178    else
179    {
180      buffer>>notifyType_;
181      if (notifyType_==NOTIFY_CREATE_SERVICE)
182      {
183        auto& arg=notifyCreateService_ ;
184        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
185      }
186      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
187      {
188        auto& arg=notifyCreateServiceOnto_ ;
189        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
190      }
191    }
192  } 
193
194  void CPoolRessource::createService(void)
195  {
196    auto& arg = notifyCreateService_ ;
197    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
198  }
199 
200  void CPoolRessource::createServiceOnto(void)
201  {
202    auto& arg = notifyCreateServiceOnto_ ;
203    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
204  }
205
206/*
207  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
208  {
209    buffer.realloc(maxBufferSize_) ;
210   
211    buffer << (int) (notifications_.size());
212   
213    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
214      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
215  }
216
217*/
218
219/*
220  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
221  {
222    std::string serviceId ;
223    int type ;
224    int size;
225    int nbPartitions;
226    bool in ;
227
228    notifications_.clear() ;
229    int nbNotifications ;
230    buffer>>nbNotifications ;
231    for(int i=0;i<nbNotifications;i++)
232    {
233      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
234      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
235    }
236  }
237*/
238
239  bool CPoolRessource::eventLoop(bool serviceOnly)
240  {
241    if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").resume();
242   
243    double time=MPI_Wtime() ;
244    int flag ;
245    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
246    if (time-lastEventLoop_ > eventLoopLatency_) 
247    {
248      //checkCreateServiceNotification() ;
249      checkNotifications() ;
250      lastEventLoop_=time ;
251    }
252   
253    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
254    {
255      if (it->second->eventLoop(serviceOnly))
256      {
257        delete it->second ;
258        services_.erase(it) ;
259        // don't forget to free service later
260        break ;
261      }
262    }
263    if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").suspend();
264    if (services_.empty() && finalizeSignal_) finished_=true ;
265    return finished_ ;
266  }
267
268  void CPoolRessource::threadEventLoop(void)
269  {
270    if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").resume();
271    info(100)<<"Launch Thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ;
272    CThreadManager::threadInitialize() ; 
273   
274    do
275    {
276
277      double time=MPI_Wtime() ;
278      int flag ;
279      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
280      if (time-lastEventLoop_ > eventLoopLatency_) 
281      {
282        //checkCreateServiceNotification() ;
283        checkNotifications() ;
284        lastEventLoop_=time ;
285      }
286   
287      for(auto it=services_.begin();it!=services_.end();++it) 
288      {
289        if (it->second->isFinished())
290        {
291          delete it->second ; 
292          services_.erase(it) ;
293          // destroy server_context -> to do later
294          break ;
295        } ;
296      }
297
298      if (services_.empty() && finalizeSignal_) finished_=true ;
299     
300      if (!finished_) CThreadManager::yield() ;
301   
302    } while (!finished_) ;
303
304    CThreadManager::threadFinalize() ;
305    if (info.isActive(logTimers)) CTimer::get("CPoolRessource::eventLoop").suspend();
306    info(100)<<"Close thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ;
307  }
308
309/*
310  void CPoolRessource::checkCreateServiceNotification(void)
311  {
312    int commRank ;
313    MPI_Comm_rank(poolComm_, &commRank) ;
314    winNotify_->lockWindow(commRank,0) ;
315    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
316   
317    if (!notifications_.empty())
318    {
319      auto info = notifications_.front() ;
320      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
321      notifications_.pop_front() ;
322      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
323    }
324    winNotify_->unlockWindow(commRank,0) ;
325
326  }
327*/
328
329  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
330  {
331     
332    info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
333    MPI_Comm serviceComm, newServiceComm, freeComm ;
334    int commRank ;
335     
336    int color;
337    if (!services_.empty()) color = 0 ;
338    else color=1 ; 
339    MPI_Comm_rank(poolComm_,&commRank) ;
340    xios::MPI_Comm_split(poolComm_, color, commRank, &freeComm) ;  // workaround
341   
342    if (services_.empty()) 
343    {
344      MPI_Comm_rank(freeComm,&commRank) ;
345      xios::MPI_Comm_split(freeComm, in, commRank, &serviceComm) ;
346
347      // temporary for event scheduler, we must using hierarchical split of free ressources communicator.
348      // we hope for now that spliting using occupancy make this match
349
350      if (in)
351      {
352        int serviceCommSize ;
353        int serviceCommRank ;
354        MPI_Comm_size(serviceComm,&serviceCommSize) ;
355        MPI_Comm_rank(serviceComm,&serviceCommRank) ;
356
357        info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
358                              <<" on rank pool "<<commRank<<endl ;
359       
360        int partitionId ; 
361        if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
362        {
363          int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
364          partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
365        }
366        else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
367
368        xios::MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
369
370        MPI_Comm_size(newServiceComm,&serviceCommSize) ;
371        MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
372        info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
373                << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
374     
375        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
376        freeRessourceEventScheduler_->splitScheduler(newServiceComm, parentScheduler, childScheduler) ;
377        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
378        isFirstSplit_=false ;
379
380        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, childScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
381       
382        xios::MPI_Comm_free(&newServiceComm) ;
383      }
384      else
385      {
386        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
387        freeRessourceEventScheduler_->splitScheduler(serviceComm, parentScheduler, childScheduler) ;
388        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
389        freeRessourceEventScheduler_ = childScheduler ;
390        isFirstSplit_=false ;
391      }
392      xios::MPI_Comm_free(&serviceComm) ;
393    }
394    xios::MPI_Comm_free(&freeComm) ;
395  }
396 
397  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
398  {
399     
400    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
401            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
402    for(auto& service : services_) 
403    {
404      if (std::get<0>(service.first)==onServiceId)
405      {
406        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
407        MPI_Comm newServiceComm ;
408        xios::MPI_Comm_dup(serviceComm, &newServiceComm) ;
409        CXios::getMpiGarbageCollector().registerCommunicator(newServiceComm) ;
410        int nbPartitions = service.second->getNbPartitions() ;
411        int partitionId = service.second->getPartitionId() ;
412        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
413        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
414        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, eventScheduler, Id_, serviceId, partitionId, type,
415                                                                         nbPartitions) ;       
416      }
417    }
418   
419  }
420
421  void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients
422  {
423    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
424  }
425
426
427  void CPoolRessource::finalizeSignal(void)
428  {
429    finalizeSignal_=true ;
430    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
431  } 
432 
433  CPoolRessource::~CPoolRessource()
434  {
435    delete winNotify_ ;
436    for(auto& service : services_) delete service.second ;
437  }
438}
Note: See TracBrowser for help on using the repository browser.