source: XIOS3/trunk/src/manager/servers_ressource.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 3 months ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 7.7 KB
Line 
1#include "servers_ressource.hpp"
2#include "window_manager.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "event_scheduler.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "timer.hpp"
9#include <vector>
10#include <string>
11#include "thread_manager.hpp"
12
13
14
15
16
17namespace xios
18{
19  using namespace std ;
20  extern CLogType logTimers ;
21
22  CServersRessource::CServersRessource(MPI_Comm serverComm) : poolRessource_(nullptr), finalizeSignal_(false)
23  {
24
25    xios::MPI_Comm_dup(serverComm, &serverComm_) ;
26    CXios::getMpiGarbageCollector().registerCommunicator(serverComm_) ; 
27    MPI_Comm xiosComm=CXios::getXiosComm() ;
28 
29    int localRank, globalRank ;
30    MPI_Comm_rank(xiosComm,&globalRank) ;
31    MPI_Comm_rank(serverComm_,&localRank) ;
32   
33    winNotify_ = new CWindowManager(serverComm_, maxBufferSize_,"CServersRessource::winNotify_") ;
34    MPI_Barrier(serverComm_) ;
35    if (localRank==localLeader_) 
36    {
37      int commSize ;
38      MPI_Comm_size(serverComm_,&commSize) ;
39      CXios::getRessourcesManager()->registerServerLeader(globalRank) ;
40      CXios::getRessourcesManager()->registerRessourcesSize(commSize) ;
41      freeRessourcesRank_.resize(commSize) ;
42      for(int i=0;i<commSize;i++) freeRessourcesRank_[i]=i ;
43    }
44
45    xios::MPI_Comm_dup(serverComm_, &freeRessourcesComm_) ; 
46    CXios::getMpiGarbageCollector().registerCommunicator(freeRessourcesComm_) ;
47    eventScheduler_ = make_shared<CEventScheduler>(freeRessourcesComm_) ;
48    freeRessourceEventScheduler_ = eventScheduler_ ;
49    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServersRessource::threadEventLoop, this) ;
50  }
51
52  void CServersRessource::createPool(const string& poolId, const int size)
53  {
54    int commSize ;
55    MPI_Comm_size(serverComm_,&commSize) ;
56    vector<int> newFreeRessourcesRank(freeRessourcesRank_.size()-size) ;
57
58    bool isPartOf ;
59
60    for(int i=0, j=0; i<freeRessourcesRank_.size();i++) 
61    {
62       if (i<size) isPartOf=true ;
63       else 
64       {
65         isPartOf=false ;
66         newFreeRessourcesRank[j]=freeRessourcesRank_[i] ;
67         j++ ;
68       }
69       
70       notifyOutType_=NOTIFY_CREATE_POOL ;
71       notifyOutCreatePool_ = make_tuple(poolId, isPartOf) ;
72       sendNotification(freeRessourcesRank_[i]) ;
73    }
74    freeRessourcesRank_ = std::move(newFreeRessourcesRank) ;
75  }
76
77  void CServersRessource::finalize(void)
78  {
79    int commSize ;
80    MPI_Comm_size(serverComm_,&commSize) ;
81
82    for(int rank=0; rank<commSize;rank++)
83    { 
84      notifyOutType_=NOTIFY_FINALIZE ;
85      sendNotification(rank) ;
86    }
87  }
88
89  void CServersRessource::sendNotification(int rank)
90  {
91    winNotify_->pushToExclusiveWindow(rank, this, &CServersRessource::notificationsDumpOut) ;
92  }
93
94
95  void CServersRessource::notificationsDumpOut(CBufferOut& buffer)
96  {
97   
98    buffer.realloc(maxBufferSize_) ;
99   
100    if (notifyOutType_==NOTIFY_CREATE_POOL)
101    {
102      auto& arg=notifyOutCreatePool_ ;
103      buffer << notifyOutType_ << std::get<0>(arg) << std::get<1>(arg) ;
104    }
105    else if (notifyOutType_==NOTIFY_FINALIZE) buffer << notifyOutType_ ;
106  }
107
108  void CServersRessource::notificationsDumpIn(CBufferIn& buffer)
109  {
110    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
111    else
112    {
113      buffer>>notifyInType_;
114      if (notifyInType_==NOTIFY_CREATE_POOL)
115      {
116        auto& arg=notifyInCreatePool_ ;
117        buffer >> std::get<0>(arg) >> std::get<1>(arg)  ;
118      }
119      else if (notifyInType_==NOTIFY_FINALIZE) { /*nothing to do*/}
120    }
121  }
122
123  bool CServersRessource::eventLoop(bool serviceOnly)
124  {
125    if (info.isActive(logTimers)) CTimer::get("CServersRessource::eventLoop").resume();
126    double time=MPI_Wtime() ;
127    int flag ;
128    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
129
130    if (time-lastEventLoop_ > eventLoopLatency_) 
131    {
132      checkNotifications() ;
133      lastEventLoop_=time ;
134    }
135
136    if (poolRessource_!=nullptr) 
137    {
138      poolRessource_->eventLoop(serviceOnly) ;
139      if (poolRessource_->isFinished())
140      {
141        delete poolRessource_ ;
142        poolRessource_=nullptr ;
143        // don't forget to free pool ressource later
144      } 
145    }
146    if (info.isActive(logTimers)) CTimer::get("CServersRessource::eventLoop").suspend();
147    if (poolRessource_==nullptr && finalizeSignal_) finished_=true ;
148    return finished_ ;
149  }
150
151  void CServersRessource::threadEventLoop(void)
152  {
153    if (info.isActive(logTimers)) CTimer::get("CServersRessource::eventLoop").resume();
154    info(100)<<"Launch Thread for  CServersRessource::threadEventLoop"<<endl ;
155    CThreadManager::threadInitialize() ; 
156
157    do
158    {
159      double time=MPI_Wtime() ;
160      int flag ;
161      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
162
163      if (time-lastEventLoop_ > eventLoopLatency_) 
164      {
165        checkNotifications() ;
166        lastEventLoop_=time ;
167      }
168
169      if (poolRessource_!=nullptr) 
170      {
171        if (poolRessource_->isFinished())
172        {
173          delete poolRessource_ ;
174          poolRessource_=nullptr ;
175          // don't forget to free pool ressource later
176        } 
177      }
178      if (poolRessource_==nullptr && finalizeSignal_) finished_=true ;
179      if (!finished_) CThreadManager::yield() ;
180   
181    } while (!finished_) ;
182
183    CThreadManager::threadFinalize() ;
184    if (info.isActive(logTimers)) CTimer::get("CServersRessource::eventLoop").suspend();
185    info(100)<<"Close thread for CServersRessource::threadEventLoop"<<endl ; ;
186  }
187
188
189  void CServersRessource::checkNotifications(void)
190  {
191    int commRank ;
192    MPI_Comm_rank(serverComm_, &commRank) ;
193    winNotify_->popFromExclusiveWindow(commRank, this, &CServersRessource::notificationsDumpIn) ;
194    if (notifyInType_==NOTIFY_CREATE_POOL) 
195    {
196      if (CThreadManager::isUsingThreads()) synchronize() ;
197      createPool() ;
198    }
199    else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ;
200  }
201
202  void CServersRessource::synchronize(void)
203  {
204    bool out=false ; 
205    size_t timeLine=0 ;
206    std::hash<string> hashString ;
207    int commSize ;
208    MPI_Comm_size(freeRessourcesComm_,&commSize) ;
209    size_t hashId = hashString("CServersRessource::"+to_string(commSize)) ;
210    freeRessourceEventScheduler_->registerEvent(timeLine, hashId) ;
211    while (!out)
212    {
213      CThreadManager::yield() ;
214      out = eventScheduler_->queryEvent(timeLine,hashId) ;
215      if (out) eventScheduler_->popEvent() ;
216    }
217  }
218
219  void CServersRessource::createPool(void)
220  {
221    auto& arg=notifyInCreatePool_ ;
222    string poolId=get<0>(arg) ;
223    bool isPartOf=get<1>(arg) ;
224   
225    int commRank ;
226    MPI_Comm poolComm ;
227    MPI_Comm_rank(freeRessourcesComm_,&commRank) ;
228    xios::MPI_Comm_split(freeRessourcesComm_, isPartOf, commRank, &poolComm) ;
229   
230    shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
231    freeRessourceEventScheduler_->splitScheduler(poolComm, parentScheduler, childScheduler) ;
232   
233    if (isFirstSplit_) eventScheduler_ = parentScheduler ; 
234    isFirstSplit_ = false ;
235
236    if (isPartOf)
237    { 
238      poolRessource_ = new CPoolRessource(poolComm, childScheduler, poolId, true) ;
239      CXios::getMpiGarbageCollector().registerCommunicator(poolComm) ;
240    }
241    else 
242    {
243      freeRessourceEventScheduler_ = childScheduler ;
244      freeRessourcesComm_=poolComm ;
245      CXios::getMpiGarbageCollector().registerCommunicator(freeRessourcesComm_) ;
246    }
247
248  }
249 
250  void CServersRessource::finalizeSignal(void)
251  {
252    finalizeSignal_=true ;
253    if (poolRessource_!=nullptr) poolRessource_->finalizeSignal() ;
254  }
255
256  bool CServersRessource::isServerLeader(void)
257  {
258    int commRank ;
259    MPI_Comm_rank(serverComm_,&commRank) ;
260    if (commRank==localLeader_) return true ;
261    else return false ;
262  }
263
264  CServersRessource::~CServersRessource()
265  {
266    delete winNotify_ ;
267  }
268}
Note: See TracBrowser for help on using the repository browser.