source: XIOS3/trunk/src/manager/pool_ressource.cpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.0 KB
Line 
1#include "pool_ressource.hpp"
2#include "services.hpp"
3#include "buffer_in.hpp"
4#include "buffer_out.hpp"
5#include "message.hpp"
6#include "type.hpp"
7#include "cxios.hpp"
8#include "timer.hpp"
9#include "event_scheduler.hpp"
10#include "thread_manager.hpp"
11
12namespace xios
13{
14  CPoolRessource::CPoolRessource(MPI_Comm poolComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false)
15  {
16    int commRank, commSize ;
17    MPI_Comm_dup(poolComm, &poolComm_) ;
18    winNotify_ = new CWindowManager(poolComm_, maxBufferSize_) ;
19    MPI_Comm_rank(poolComm, &commRank) ;
20    MPI_Comm_size(poolComm, &commSize) ;
21    info(40)<<"CPoolRessource::CPoolRessource  : creating new pool : "<<Id<<endl ;
22    if (commRank==localLeader_)
23    {
24      for(int i=0; i<commSize;i++) occupancy_.insert(std::pair<char,int>(0,i)) ; 
25      int globalLeaderRank ;
26      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ;
27      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ;
28    }
29   
30    notifyType_=NOTIFY_NOTHING;
31    winNotify_->updateToExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpOut) ;
32    MPI_Barrier(poolComm_) ;
33    if (eventScheduler) eventScheduler_=eventScheduler ;
34    else eventScheduler_= make_shared<CEventScheduler>(poolComm) ;
35    freeRessourceEventScheduler_ = eventScheduler_ ;
36    std::hash<string> hashString ;
37    hashId_ = hashString("CPoolRessource::"+Id) ;
38    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CPoolRessource::threadEventLoop, this) ;
39  }
40
41  void CPoolRessource::synchronize(void)
42  {
43    bool out=false ; 
44    size_t timeLine=0 ;
45         
46    eventScheduler_->registerEvent(timeLine, hashId_) ;
47    while (!out)
48    {
49      CThreadManager::yield() ;
50      out = eventScheduler_->queryEvent(timeLine,hashId_) ;
51      if (out) eventScheduler_->popEvent() ;
52    }
53  }
54
55  void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)
56  {
57    // for now suppose nbPartitions=1
58   
59    auto it=occupancy_.begin() ;
60
61    // ym obsolete, service cannot overlap, only created on separate ressource or matching excatly existing service
62    // occupancy management must not be used anymore => simplification
63    // for now raise a message error when no ressources are availables
64   
65    int commSize ;
66    MPI_Comm_size(poolComm_, &commSize) ;
67    vector<bool> procs_in(commSize,false) ;
68    vector<pair<int,int>> procs_update ;
69
70    for(int i=0; i<size; i++) 
71    {
72      if (it->first != 0) ERROR("void CPoolRessource::createService(const std::string& serviceId, int type, int size, int nbPartitions)",
73                                 << "No enough free ressources on pool id="<<getId()<<" to launch service id="<<serviceId);
74      procs_in[it->second]=true ;
75      procs_update.push_back(std::pair<int,int>(it->first+1,it->second)) ;
76      ++it ;
77    }
78
79
80    occupancy_.erase(occupancy_.begin(),it) ;
81    occupancy_.insert(procs_update.begin(),procs_update.end()) ;
82   
83    info(40)<<"CPoolRessource::createService  : notify createService to all pool members ; serviceId : "<<serviceId<<endl ;
84    for(int rank=0; rank<commSize; rank++)
85    {
86      if (procs_in[rank]) createServiceNotify(rank, serviceId, type, size, nbPartitions, true) ;
87      else createServiceNotify(rank, serviceId, type, size, nbPartitions, false) ;
88    }
89  }
90
91  void CPoolRessource::createServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
92  {
93    // for now suppose nbPartitions=1
94   
95    auto it=occupancy_.begin() ;
96    int commSize ;
97    MPI_Comm_size(poolComm_, &commSize) ;
98   
99    info(40)<<"CPoolRessource::createService  : notify createServiceOnto to all pool members ; serviceId : "<<serviceId
100            <<"  onto service Id  :"<< serviceId<<endl ;
101    for(int rank=0; rank<commSize; rank++) createServiceOntoNotify(rank, serviceId, type, onServiceId) ;
102  }
103
104/* 
105  void CPoolRessource::createServiceNotify(int rank, const std::string& serviceId, int type, int size, int nbPartitions,
106                                           bool in)
107  {
108    winNotify_->lockWindow(rank,0) ;
109    winNotify_->updateFromWindow(rank, this, &CPoolRessource::createServiceDumpIn) ;
110    notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
111    winNotify_->updateToWindow(rank, this, &CPoolRessource::createServiceDumpOut) ; 
112    winNotify_->unlockWindow(rank,0) ;   
113  }
114*/
115 
116  void CPoolRessource::createServiceNotify(int rank, const string& serviceId, int type, int size, int nbPartitions, bool in)
117  {
118    notifyType_=NOTIFY_CREATE_SERVICE ;
119    notifyCreateService_=make_tuple(serviceId, type, size, nbPartitions, in ) ;
120    sendNotification(rank) ;
121  }
122
123
124  void CPoolRessource::createServiceOntoNotify(int rank, const string& serviceId, int type, const string& onServiceId)
125  {
126    notifyType_=NOTIFY_CREATE_SERVICE_ONTO ;
127    notifyCreateServiceOnto_=make_tuple(serviceId, type, onServiceId) ;
128    sendNotification(rank) ;
129  }
130
131
132  void CPoolRessource::sendNotification(int rank)
133  {
134    winNotify_->pushToExclusiveWindow(rank, this, &CPoolRessource::notificationsDumpOut) ;
135  }
136
137  void CPoolRessource::checkNotifications(void)
138  {
139    int commRank ;
140    MPI_Comm_rank(poolComm_, &commRank) ;
141    winNotify_->popFromExclusiveWindow(commRank, this, &CPoolRessource::notificationsDumpIn) ;
142    if (notifyType_==NOTIFY_CREATE_SERVICE) 
143    {
144      if (CThreadManager::isUsingThreads()) synchronize() ;
145      createService() ;
146    }
147    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO) 
148    {
149      if (CThreadManager::isUsingThreads()) synchronize() ;
150      createServiceOnto() ;
151    }
152  }
153
154
155  void CPoolRessource::notificationsDumpOut(CBufferOut& buffer)
156  {
157
158    buffer.realloc(maxBufferSize_) ;
159   
160    if (notifyType_==NOTIFY_CREATE_SERVICE)
161    {
162      auto& arg=notifyCreateService_ ;
163      buffer << notifyType_<< get<0>(arg) << get<1>(arg) << std::get<2>(arg) << get<3>(arg) << get<4>(arg);
164    }
165    else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
166    {
167      auto& arg=notifyCreateServiceOnto_ ;
168      buffer << notifyType_<< get<0>(arg) << get<1>(arg)<< get<2>(arg)  ;
169    }
170  }
171
172  void CPoolRessource::notificationsDumpIn(CBufferIn& buffer)
173  {
174    if (buffer.bufferSize() == 0) notifyType_= NOTIFY_NOTHING ;
175    else
176    {
177      buffer>>notifyType_;
178      if (notifyType_==NOTIFY_CREATE_SERVICE)
179      {
180        auto& arg=notifyCreateService_ ;
181        buffer >> get<0>(arg) >> get<1>(arg) >> std::get<2>(arg)>> get<3>(arg)>> get<4>(arg) ;
182      }
183      else if (notifyType_==NOTIFY_CREATE_SERVICE_ONTO)
184      {
185        auto& arg=notifyCreateServiceOnto_ ;
186        buffer >> get<0>(arg) >> get<1>(arg) >> get<2>(arg) ;
187      }
188    }
189  } 
190
191  void CPoolRessource::createService(void)
192  {
193    auto& arg = notifyCreateService_ ;
194    createNewService(get<0>(arg), get<1>(arg), get<2>(arg), get<3>(arg), get<4>(arg)) ;
195  }
196 
197  void CPoolRessource::createServiceOnto(void)
198  {
199    auto& arg = notifyCreateServiceOnto_ ;
200    createNewServiceOnto(get<0>(arg), get<1>(arg), get<2>(arg)) ;
201  }
202
203/*
204  void CPoolRessource::createServiceDumpOut(CBufferOut& buffer)
205  {
206    buffer.realloc(maxBufferSize_) ;
207   
208    buffer << (int) (notifications_.size());
209   
210    for(auto it=notifications_.begin();it!=notifications_.end(); ++it)
211      buffer << std::get<0>(*it) << static_cast<int>(std::get<1>(*it))<< std::get<2>(*it)<< std::get<3>(*it) << std::get<4>(*it)  ;
212  }
213
214*/
215
216/*
217  void CPoolRessource::createServiceDumpIn(CBufferIn& buffer)
218  {
219    std::string serviceId ;
220    int type ;
221    int size;
222    int nbPartitions;
223    bool in ;
224
225    notifications_.clear() ;
226    int nbNotifications ;
227    buffer>>nbNotifications ;
228    for(int i=0;i<nbNotifications;i++)
229    {
230      buffer>>serviceId>>type>>size>>nbPartitions>>in ;
231      notifications_.push_back(std::make_tuple(serviceId,type,size,nbPartitions,in)) ;
232    }
233  }
234*/
235
236  bool CPoolRessource::eventLoop(bool serviceOnly)
237  {
238    CTimer::get("CPoolRessource::eventLoop").resume();
239   
240    double time=MPI_Wtime() ;
241    int flag ;
242    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
243    if (time-lastEventLoop_ > eventLoopLatency_) 
244    {
245      //checkCreateServiceNotification() ;
246      checkNotifications() ;
247      lastEventLoop_=time ;
248    }
249   
250    for (auto it=services_.begin(); it!=services_.end() ; ++it) 
251    {
252      if (it->second->eventLoop(serviceOnly))
253      {
254        delete it->second ;
255        services_.erase(it) ;
256        // don't forget to free service later
257        break ;
258      }
259    }
260    CTimer::get("CPoolRessource::eventLoop").suspend();
261    if (services_.empty() && finalizeSignal_) finished_=true ;
262    return finished_ ;
263  }
264
265  void CPoolRessource::threadEventLoop(void)
266  {
267    CTimer::get("CPoolRessource::eventLoop").resume();
268    info(100)<<"Launch Thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ;
269    CThreadManager::threadInitialize() ; 
270   
271    do
272    {
273
274      double time=MPI_Wtime() ;
275      int flag ;
276      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
277      if (time-lastEventLoop_ > eventLoopLatency_) 
278      {
279        //checkCreateServiceNotification() ;
280        checkNotifications() ;
281        lastEventLoop_=time ;
282      }
283   
284      for(auto it=services_.begin();it!=services_.end();++it) 
285      {
286        if (it->second->isFinished())
287        {
288          delete it->second ; 
289          services_.erase(it) ;
290          // destroy server_context -> to do later
291          break ;
292        } ;
293      }
294
295      CTimer::get("CPoolRessource::eventLoop").suspend();
296      if (services_.empty() && finalizeSignal_) finished_=true ;
297     
298      if (!finished_) CThreadManager::yield() ;
299   
300    } while (!finished_) ;
301
302    CThreadManager::threadFinalize() ;
303    info(100)<<"Close thread for  CPoolRessource::threadEventLoop, pool id = "<<Id_<<endl ;
304  }
305
306/*
307  void CPoolRessource::checkCreateServiceNotification(void)
308  {
309    int commRank ;
310    MPI_Comm_rank(poolComm_, &commRank) ;
311    winNotify_->lockWindow(commRank,0) ;
312    winNotify_->updateFromWindow(commRank, this, &CPoolRessource::createServiceDumpIn) ;
313   
314    if (!notifications_.empty())
315    {
316      auto info = notifications_.front() ;
317      createNewService(get<0>(info), get<1>(info), get<2>(info), get<3>(info), get<4>(info)) ;
318      notifications_.pop_front() ;
319      winNotify_->updateToWindow(commRank, this, &CPoolRessource::createServiceDumpOut) ;     
320    }
321    winNotify_->unlockWindow(commRank,0) ;
322
323  }
324*/
325
326  void CPoolRessource::createNewService(const std::string& serviceId, int type, int size, int nbPartitions, bool in)
327  {
328     
329    info(40)<<"CPoolRessource::createNewService  : receive createService notification ; serviceId : "<<serviceId<<endl ;
330    MPI_Comm serviceComm, newServiceComm, freeComm ;
331    int commRank ;
332     
333    int color;
334    if (!services_.empty()) color = 0 ;
335    else color=1 ; 
336    MPI_Comm_rank(poolComm_,&commRank) ;
337    MPI_Comm_split(poolComm_, color, commRank, &freeComm) ;  // workaround
338   
339    if (services_.empty()) 
340    {
341      MPI_Comm_rank(freeComm,&commRank) ;
342      MPI_Comm_split(freeComm, in, commRank, &serviceComm) ;
343
344      // temporary for event scheduler, we must using hierarchical split of free ressources communicator.
345      // we hope for now that spliting using occupancy make this match
346
347      if (in)
348      {
349        int serviceCommSize ;
350        int serviceCommRank ;
351        MPI_Comm_size(serviceComm,&serviceCommSize) ;
352        MPI_Comm_rank(serviceComm,&serviceCommRank) ;
353
354        info(10)<<"Service  "<<serviceId<<" created "<<"  service size : "<<serviceCommSize<< "   service rank : "<<serviceCommRank
355                              <<" on rank pool "<<commRank<<endl ;
356       
357        int partitionId ; 
358        if ( serviceCommRank >= (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) )
359        {
360          int rank =  serviceCommRank - (serviceCommSize/nbPartitions+1)*(serviceCommSize%nbPartitions) ;
361          partitionId = serviceCommSize%nbPartitions +  rank / (serviceCommSize/nbPartitions) ;
362        }
363        else  partitionId = serviceCommRank / (serviceCommSize/nbPartitions + 1) ;
364
365        MPI_Comm_split(serviceComm, partitionId, commRank, &newServiceComm) ;
366
367        MPI_Comm_size(newServiceComm,&serviceCommSize) ;
368        MPI_Comm_rank(newServiceComm,&serviceCommRank) ;
369        info(10)<<"Service  "<<serviceId<<" created "<<"  partition : " <<partitionId<<" service size : "<<serviceCommSize
370                << " service rank : "<<serviceCommRank <<" on rank pool "<<commRank<<endl ;
371     
372        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
373        freeRessourceEventScheduler_->splitScheduler(newServiceComm, parentScheduler, childScheduler) ;
374        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
375        isFirstSplit_=false ;
376
377        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, childScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
378       
379        MPI_Comm_free(&newServiceComm) ;
380      }
381      else
382      {
383        shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
384        freeRessourceEventScheduler_->splitScheduler(serviceComm, parentScheduler, childScheduler) ;
385        if (isFirstSplit_) eventScheduler_ = parentScheduler ;
386        freeRessourceEventScheduler_ = childScheduler ;
387        isFirstSplit_=false ;
388      }
389      MPI_Comm_free(&serviceComm) ;
390    }
391    MPI_Comm_free(&freeComm) ;
392  }
393 
394  void CPoolRessource::createNewServiceOnto(const std::string& serviceId, int type, const std::string& onServiceId)
395  {
396     
397    info(40)<<"CPoolRessource::createNewServiceOnto  : receive createServiceOnto notification ; serviceId : "
398            <<serviceId<<"  ontoServiceId : "<<onServiceId<<endl ;
399    for(auto& service : services_) 
400    {
401      if (std::get<0>(service.first)==onServiceId)
402      {
403        const MPI_Comm& serviceComm = service.second->getCommunicator() ;
404        MPI_Comm newServiceComm ;
405        MPI_Comm_dup(serviceComm, &newServiceComm) ;
406        int nbPartitions = service.second->getNbPartitions() ;
407        int partitionId = service.second->getPartitionId() ;
408        shared_ptr<CEventScheduler>  eventScheduler = service.second->getEventScheduler() ;
409        info(40)<<"CPoolRessource::createNewServiceOnto ; found onServiceId : "<<onServiceId<<endl  ;
410        services_[std::make_tuple(serviceId,partitionId)] = new CService(newServiceComm, eventScheduler, Id_, serviceId, partitionId, type,
411                                                                         nbPartitions) ;       
412      }
413    }
414   
415  }
416
417  void CPoolRessource::createService(MPI_Comm serviceComm, shared_ptr<CEventScheduler> eventScheduler, const std::string& serviceId, int partitionId, int type, int nbPartitions) // for clients
418  {
419    services_[std::make_tuple(serviceId,partitionId)] = new CService(serviceComm, eventScheduler, Id_, serviceId, partitionId, type, nbPartitions) ;
420  }
421
422
423  void CPoolRessource::finalizeSignal(void)
424  {
425    finalizeSignal_=true ;
426    for (auto it=services_.begin(); it!=services_.end() ; ++it) it->second->finalizeSignal() ;
427  } 
428 
429  CPoolRessource::~CPoolRessource()
430  {
431    delete winNotify_ ;
432    for(auto& service : services_) delete service.second ;
433  }
434}
Note: See TracBrowser for help on using the repository browser.