source: XIOS3/trunk/src/manager/servers_ressource.cpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 10 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 7.3 KB
Line 
1#include "servers_ressource.hpp"
2#include "window_manager.hpp"
3#include "ressources_manager.hpp"
4#include "pool_ressource.hpp"
5#include "event_scheduler.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "timer.hpp"
9#include <vector>
10#include <string>
11#include "thread_manager.hpp"
12
13
14
15
16
17namespace xios
18{
19  using namespace std ;
20
21  CServersRessource::CServersRessource(MPI_Comm serverComm) : poolRessource_(nullptr), finalizeSignal_(false)
22  {
23
24    MPI_Comm_dup(serverComm, &serverComm_) ;
25    MPI_Comm xiosComm=CXios::getXiosComm() ;
26 
27    int localRank, globalRank ;
28    MPI_Comm_rank(xiosComm,&globalRank) ;
29    MPI_Comm_rank(serverComm_,&localRank) ;
30   
31    winNotify_ = new CWindowManager(serverComm_, maxBufferSize_) ;
32    MPI_Barrier(serverComm_) ;
33    if (localRank==localLeader_) 
34    {
35      int commSize ;
36      MPI_Comm_size(serverComm_,&commSize) ;
37      CXios::getRessourcesManager()->registerServerLeader(globalRank) ;
38      CXios::getRessourcesManager()->registerRessourcesSize(commSize) ;
39      freeRessourcesRank_.resize(commSize) ;
40      for(int i=0;i<commSize;i++) freeRessourcesRank_[i]=i ;
41    }
42
43    MPI_Comm_dup(serverComm_, &freeRessourcesComm_) ; 
44    eventScheduler_ = make_shared<CEventScheduler>(freeRessourcesComm_) ;
45    freeRessourceEventScheduler_ = eventScheduler_ ;
46    if (CThreadManager::isUsingThreads()) CThreadManager::spawnThread(&CServersRessource::threadEventLoop, this) ;
47  }
48
49  void CServersRessource::createPool(const string& poolId, const int size)
50  {
51    int commSize ;
52    MPI_Comm_size(serverComm_,&commSize) ;
53    vector<int> newFreeRessourcesRank(freeRessourcesRank_.size()-size) ;
54
55    bool isPartOf ;
56
57    for(int i=0, j=0; i<freeRessourcesRank_.size();i++) 
58    {
59       if (i<size) isPartOf=true ;
60       else 
61       {
62         isPartOf=false ;
63         newFreeRessourcesRank[j]=freeRessourcesRank_[i] ;
64         j++ ;
65       }
66       
67       notifyOutType_=NOTIFY_CREATE_POOL ;
68       notifyOutCreatePool_ = make_tuple(poolId, isPartOf) ;
69       sendNotification(freeRessourcesRank_[i]) ;
70    }
71    freeRessourcesRank_ = std::move(newFreeRessourcesRank) ;
72  }
73
74  void CServersRessource::finalize(void)
75  {
76    int commSize ;
77    MPI_Comm_size(serverComm_,&commSize) ;
78
79    for(int rank=0; rank<commSize;rank++)
80    { 
81      notifyOutType_=NOTIFY_FINALIZE ;
82      sendNotification(rank) ;
83    }
84  }
85
86  void CServersRessource::sendNotification(int rank)
87  {
88    winNotify_->pushToExclusiveWindow(rank, this, &CServersRessource::notificationsDumpOut) ;
89  }
90
91
92  void CServersRessource::notificationsDumpOut(CBufferOut& buffer)
93  {
94   
95    buffer.realloc(maxBufferSize_) ;
96   
97    if (notifyOutType_==NOTIFY_CREATE_POOL)
98    {
99      auto& arg=notifyOutCreatePool_ ;
100      buffer << notifyOutType_ << std::get<0>(arg) << std::get<1>(arg) ;
101    }
102    else if (notifyOutType_==NOTIFY_FINALIZE) buffer << notifyOutType_ ;
103  }
104
105  void CServersRessource::notificationsDumpIn(CBufferIn& buffer)
106  {
107    if (buffer.bufferSize() == 0) notifyInType_= NOTIFY_NOTHING ;
108    else
109    {
110      buffer>>notifyInType_;
111      if (notifyInType_==NOTIFY_CREATE_POOL)
112      {
113        auto& arg=notifyInCreatePool_ ;
114        buffer >> std::get<0>(arg) >> std::get<1>(arg)  ;
115      }
116      else if (notifyInType_==NOTIFY_FINALIZE) { /*nothing to do*/}
117    }
118  }
119
120  bool CServersRessource::eventLoop(bool serviceOnly)
121  {
122    CTimer::get("CServersRessource::eventLoop").resume();
123    double time=MPI_Wtime() ;
124    int flag ;
125    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
126
127    if (time-lastEventLoop_ > eventLoopLatency_) 
128    {
129      checkNotifications() ;
130      lastEventLoop_=time ;
131    }
132
133    if (poolRessource_!=nullptr) 
134    {
135      poolRessource_->eventLoop(serviceOnly) ;
136      if (poolRessource_->isFinished())
137      {
138        delete poolRessource_ ;
139        poolRessource_=nullptr ;
140        // don't forget to free pool ressource later
141      } 
142    }
143    CTimer::get("CServersRessource::eventLoop").suspend();
144    if (poolRessource_==nullptr && finalizeSignal_) finished_=true ;
145    return finished_ ;
146  }
147
148  void CServersRessource::threadEventLoop(void)
149  {
150    CTimer::get("CServersRessource::eventLoop").resume();
151    info(100)<<"Launch Thread for  CServersRessource::threadEventLoop"<<endl ;
152    CThreadManager::threadInitialize() ; 
153
154    do
155    {
156      double time=MPI_Wtime() ;
157      int flag ;
158      MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
159
160      if (time-lastEventLoop_ > eventLoopLatency_) 
161      {
162        checkNotifications() ;
163        lastEventLoop_=time ;
164      }
165
166      if (poolRessource_!=nullptr) 
167      {
168        if (poolRessource_->isFinished())
169        {
170          delete poolRessource_ ;
171          poolRessource_=nullptr ;
172          // don't forget to free pool ressource later
173        } 
174      }
175      CTimer::get("CServersRessource::eventLoop").suspend();
176      if (poolRessource_==nullptr && finalizeSignal_) finished_=true ;
177      if (!finished_) CThreadManager::yield() ;
178   
179    } while (!finished_) ;
180
181    CThreadManager::threadFinalize() ;
182    info(100)<<"Close thread for CServersRessource::threadEventLoop"<<endl ; ;
183  }
184
185
186  void CServersRessource::checkNotifications(void)
187  {
188    int commRank ;
189    MPI_Comm_rank(serverComm_, &commRank) ;
190    winNotify_->popFromExclusiveWindow(commRank, this, &CServersRessource::notificationsDumpIn) ;
191    if (notifyInType_==NOTIFY_CREATE_POOL) 
192    {
193      if (CThreadManager::isUsingThreads()) synchronize() ;
194      createPool() ;
195    }
196    else if (notifyInType_==NOTIFY_FINALIZE) finalizeSignal() ;
197  }
198
199  void CServersRessource::synchronize(void)
200  {
201    bool out=false ; 
202    size_t timeLine=0 ;
203    std::hash<string> hashString ;
204    int commSize ;
205    MPI_Comm_size(freeRessourcesComm_,&commSize) ;
206    size_t hashId = hashString("CServersRessource::"+to_string(commSize)) ;
207    freeRessourceEventScheduler_->registerEvent(timeLine, hashId) ;
208    while (!out)
209    {
210      CThreadManager::yield() ;
211      out = eventScheduler_->queryEvent(timeLine,hashId) ;
212      if (out) eventScheduler_->popEvent() ;
213    }
214  }
215
216  void CServersRessource::createPool(void)
217  {
218    auto& arg=notifyInCreatePool_ ;
219    string poolId=get<0>(arg) ;
220    bool isPartOf=get<1>(arg) ;
221   
222    int commRank ;
223    MPI_Comm poolComm ;
224    MPI_Comm_rank(freeRessourcesComm_,&commRank) ;
225    MPI_Comm_split(freeRessourcesComm_, isPartOf, commRank, &poolComm) ;
226   
227    shared_ptr<CEventScheduler> parentScheduler, childScheduler ;
228    freeRessourceEventScheduler_->splitScheduler(poolComm, parentScheduler, childScheduler) ;
229   
230    if (isFirstSplit_) eventScheduler_ = parentScheduler ; 
231    isFirstSplit_ = false ;
232
233    if (isPartOf)
234    { 
235      poolRessource_ = new CPoolRessource(poolComm, childScheduler, poolId, true) ;
236      MPI_Comm_free(&poolComm) ;
237    }
238    else 
239    {
240      freeRessourceEventScheduler_ = childScheduler ;
241      MPI_Comm_free(&freeRessourcesComm_) ;
242      freeRessourcesComm_=poolComm ;
243    }
244
245  }
246 
247  void CServersRessource::finalizeSignal(void)
248  {
249    finalizeSignal_=true ;
250    if (poolRessource_!=nullptr) poolRessource_->finalizeSignal() ;
251  }
252
253  bool CServersRessource::isServerLeader(void)
254  {
255    int commRank ;
256    MPI_Comm_rank(serverComm_,&commRank) ;
257    if (commRank==localLeader_) return true ;
258    else return false ;
259  }
260
261  CServersRessource::~CServersRessource()
262  {
263    delete winNotify_ ;
264  }
265}
Note: See TracBrowser for help on using the repository browser.