source: XIOS3/trunk/src/event_scheduler.cpp @ 2569

Last change on this file since 2569 was 2569, checked in by jderouillat, 10 months ago

Clean memory associated to EventScheduler? hierarchy and MpiGarbageCollector?

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.8 KB
Line 
1#include "event_scheduler.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "tracer.hpp"
5#include "cxios.hpp"
6
7namespace xios
8{
9 
10 
11  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
12  {
13     schedulerLevel_=0 ;
14     parentScheduler_.reset();
15     childScheduler_.reset();
16     initialize(comm) ;
17  }
18 
19  CEventScheduler::CEventScheduler(const MPI_Comm& comm, size_t schedulerLevel) 
20  {
21     schedulerLevel_=schedulerLevel ;
22     parentScheduler_.reset();
23     childScheduler_.reset();
24     initialize(comm) ;
25  }
26 
27  void CEventScheduler::initialize(const MPI_Comm& comm) 
28  {
29    MPI_Comm_dup(comm, &communicator_) ;
30    CXios::getMpiGarbageCollector().registerCommunicator(communicator_) ;
31    MPI_Comm_size(communicator_,&mpiSize_) ;
32    MPI_Comm_rank(communicator_,&mpiRank_);
33
34
35    int maxChild=1 ;
36
37    int m ;
38    do
39    {
40      m=1 ;
41      maxChild=maxChild+1 ;
42      for(int i=0;i<maxChild;i++) m=m*maxChild ;
43     } while(m<mpiSize_) ;
44   
45   
46    int maxLevel=0 ;
47    for(int size=1; size<=mpiSize_; size*=maxChild) maxLevel++ ; 
48
49    int begin, end, nb ;
50    int pos, n ;
51 
52    parent_=vector<int>(maxLevel+1) ;
53    child_=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
54    nbChild_=vector<int> (maxLevel+1) ;
55   
56    level_=0 ;
57    begin=0 ;
58    end=mpiSize_-1 ;     
59    nb=end-begin+1 ;
60     
61    do
62    {
63      n=0 ;
64      pos=begin ;
65      nbChild_[level_]=0 ;
66      parent_[level_+1]=begin ;
67      for(int i=0;i<maxChild && i<nb ;i++)
68      {
69        if (i<nb%maxChild) n = nb/maxChild + 1 ;
70        else n = nb/maxChild ;
71     
72        if (mpiRank_>=pos && mpiRank_<pos+n)
73        {
74          begin=pos ;
75          end=pos+n-1 ;
76        }
77        child_[level_][i]=pos ;
78        pos=pos+n ;
79        nbChild_[level_]++ ;
80      } 
81      nb=end-begin+1 ;
82      level_=level_+1 ;
83    } while (nb>1) ;
84
85   
86  }
87
88  CEventScheduler::~CEventScheduler()
89  {
90    while (!pendingSentParentRequest_.empty() || !pendingRecvParentRequest_.empty() || !pendingRecvChildRequest_.empty() ||  !pendingSentChildRequest_.empty())
91    {
92      checkEvent_() ;
93    } 
94    cleanSplitSchedulers();
95  }
96   
97  void CEventScheduler::cleanSplitSchedulers()
98  {
99    // Cleaning is operated recursively going from parent to child
100    if (parentScheduler_)
101    {
102      if (parentScheduler_->childScheduler_.get() == this)
103      {
104        parentScheduler_.reset();
105      }
106      else // if orphan (due to splitScheduler) : clean parent tree (it does not have child)
107      {
108        parentScheduler_->cleanSplitSchedulers();
109        parentScheduler_.reset();
110      }
111    }                   
112    if (childScheduler_)
113    {
114      childScheduler_->cleanSplitSchedulers();
115      childScheduler_.reset();
116    }
117  } 
118
119  void CEventScheduler::splitScheduler(const MPI_Comm& splittedComm, shared_ptr<CEventScheduler>& parent, shared_ptr<CEventScheduler>& child)
120  {
121    int color ;
122    MPI_Comm newComm ;
123    child = make_shared<CEventScheduler>(splittedComm, schedulerLevel_+ 1) ;
124    if (child->isRoot()) color=1 ;
125    else color=0 ;
126    MPI_Comm_split(communicator_, color, mpiRank_, &newComm) ;
127    CXios::getMpiGarbageCollector().registerCommunicator(newComm) ;
128
129    parent = make_shared<CEventScheduler>(newComm , schedulerLevel_) ;
130    child->setParentScheduler(parent) ;
131    parent->setChildScheduler(child) ;
132    if (parentScheduler_) 
133    {
134      parentScheduler_->setChildScheduler(parent) ;
135      parent->setParentScheduler(parentScheduler_) ;
136    }
137
138  }
139
140  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
141  {
142    getBaseScheduler()->registerEvent(timeLine, contextHashId, schedulerLevel_) ;
143    checkEvent_() ;
144  }
145 
146  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel)
147  {
148    registerEvent(timeLine, contextHashId, schedulerLevel, level_) ;
149    checkEvent_() ;
150  }
151
152  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel, const size_t lev)
153  {
154       
155    traceOff() ;
156    SPendingRequest* sentRequest=new SPendingRequest ;
157    sentRequest->buffer[0]=timeLine ;
158    sentRequest->buffer[1]=contextHashId ;
159    sentRequest->buffer[2]=schedulerLevel ;
160    sentRequest->buffer[3]=lev-1 ;
161
162    pendingSentParentRequest_.push(sentRequest) ;
163//    info(100)<<"CEventScheduler::registerEvent => send event to parent "<<parent_[lev]<<" of level" <<lev-1<<endl ;
164    MPI_Isend(sentRequest->buffer,4, MPI_UNSIGNED_LONG, parent_[lev], 0, communicator_, &sentRequest->request) ;
165    traceOn() ;
166  } 
167
168 
169  bool CEventScheduler::queryEvent_(const size_t timeLine, const size_t contextHashId)
170  {
171    checkEvent_() ;
172
173    if (! eventStack_.empty() && eventStack_.front().first==timeLine && eventStack_.front().second==contextHashId)
174    {
175      return true ;
176    }
177    else return false ; 
178  } 
179 
180  void CEventScheduler::checkEvent_(void)
181  {
182   
183    if (parentScheduler_) parentScheduler_->checkEvent_() ;
184    traceOff() ;
185    checkChildRequest() ;
186    checkParentRequest() ;
187    traceOn() ;
188   
189  }
190 
191  void CEventScheduler::checkParentRequest(void)
192  {
193    int completed ;
194    MPI_Status status ;
195    int received ;
196    SPendingRequest* recvRequest ;
197    completed=true ;
198   
199    // check sent request to parent
200    while (! pendingSentParentRequest_.empty() && completed)
201    {
202      MPI_Test( & pendingSentParentRequest_.front()->request, &completed, &status) ;
203      if (completed) 
204      {
205        delete pendingSentParentRequest_.front() ;
206        pendingSentParentRequest_.pop() ;
207      }
208    }
209   
210    // probe if a message is coming from parent
211    received=true ;
212    while(received)
213    {
214      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator_,&received, &status) ;
215      if (received)
216      {
217        recvRequest=new SPendingRequest ;
218        MPI_Irecv(recvRequest->buffer, 4, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator_, &(recvRequest->request)) ;
219        pendingRecvParentRequest_.push(recvRequest) ;
220      }
221    }
222   
223     // check sent request from parent
224    completed=true ;
225    while (! pendingRecvParentRequest_.empty() && completed)
226    {
227      recvRequest=pendingRecvParentRequest_.front() ;
228      MPI_Test( &(recvRequest->request), &completed, &status) ;
229
230      if (completed) 
231      {
232        size_t timeLine=recvRequest->buffer[0] ;
233        size_t hashId=recvRequest->buffer[1] ;
234        size_t schedulerLevel=recvRequest->buffer[2] ;
235        size_t lev=recvRequest->buffer[3] ;
236        delete recvRequest ;
237        pendingRecvParentRequest_.pop() ;       
238       
239//        info(100)<<"CEventScheduler::checkParentRequest => receive event from parent "<< status.MPI_SOURCE<<"at level"<< lev<< endl ;
240       
241        if (lev==level_) 
242        {
243          if (childScheduler_)
244          {
245//            info(100)<<"CEventScheduler::checkParentRequest => bcast event to child scheduler "<<endl;
246            childScheduler_->bcastEvent(timeLine, hashId, schedulerLevel, 0) ;
247          }
248          else
249          { 
250//            info(100)<<"CEventScheduler::checkParentRequest => put event to stack : timeLine : "<<timeLine<<"  hashId : "<<hashId<<endl;
251            eventStack_.push(pair<size_t,size_t>(timeLine,hashId)) ;
252          }
253        }
254        else 
255        {
256//          info(100)<<"CEventScheduler::checkParentRequest => bcast event to child process "<<endl;
257          bcastEvent(timeLine, hashId, schedulerLevel, lev) ;
258        }
259      }
260    }   
261   
262  }
263
264  void CEventScheduler::checkChildRequest(void)
265  {
266// function call only by parent mpi process
267
268    MPI_Status status ; 
269    int received ;
270    received=true ;
271    SPendingRequest* recvRequest ;
272   
273    // check for posted requests and make the corresponding receive
274    while(received)
275    {
276      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator_,&received, &status) ;
277      if (received)
278      {
279        recvRequest=new SPendingRequest ;
280        MPI_Irecv(recvRequest->buffer, 4, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator_, &recvRequest->request) ;
281        pendingRecvChildRequest_.push_back(recvRequest) ;
282      }
283    }
284   
285    // check if receive request is achieved
286   
287    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest_.begin(); it!=pendingRecvChildRequest_.end() ; )
288    {
289      MPI_Test(&((*it)->request),&received,&status) ;
290      if (received)
291      {
292        size_t timeLine=(*it)->buffer[0] ;
293        size_t hashId=(*it)->buffer[1] ;
294        size_t schedulerLevel=(*it)->buffer[2] ;
295        size_t lev=(*it)->buffer[3] ;
296       
297//        info(100)<<"CEventScheduler::checkChildRequest => received event from child "<<status.MPI_SOURCE<<" at level "<<lev<<endl;
298
299        SEvent event={timeLine, hashId, schedulerLevel, lev} ;
300        delete *it ; // free mem
301        it=pendingRecvChildRequest_.erase(it) ; // get out of the list
302       
303        map< SEvent,int>::iterator itEvent=recvEvent_.find(event) ;
304        if (itEvent==recvEvent_.end()) 
305        {
306          itEvent=(recvEvent_.insert(pair< SEvent ,int > (event,1))).first ;
307 
308        }
309        else (itEvent->second)++ ;
310        if (itEvent->second==nbChild_[lev])
311        {
312          if (lev==0)
313          {
314            if (schedulerLevel==schedulerLevel_) 
315            { 
316//              info(100)<<"CEventScheduler::checkChildRequest => bcastEvent to child"<<endl ;
317              bcastEvent(timeLine, hashId, schedulerLevel, lev) ;
318            }
319            else 
320            { 
321//              info(100)<<"CEventScheduler::checkChildRequest => register event to parent scheduler"<<endl ;
322              parentScheduler_->registerEvent(timeLine, hashId, schedulerLevel) ;
323            }
324            recvEvent_.erase(itEvent) ;
325          }
326          else
327          {
328//            info(100)<<"CEventScheduler::checkChildRequest => register event to parent process"<<endl ;
329            registerEvent( timeLine,hashId, schedulerLevel, lev) ;
330            recvEvent_.erase(itEvent) ;
331          }
332        }
333      }
334      else ++it ;
335    }
336   
337    // check if bcast request is achieved
338
339    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest_.begin(); it!=pendingSentChildRequest_.end() ; )
340    {
341      MPI_Test(&(*it)->request,&received,&status) ;
342      if (received)
343      {
344        delete *it ;    // free memory
345        it = pendingSentChildRequest_.erase(it) ;          // get out of the list
346
347      }
348      else ++it ;
349       
350    }
351  }
352 
353  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t schedulerLevel, const size_t lev)
354  {
355    SPendingRequest* sentRequest ;
356     
357   
358    for(int i=0; i<nbChild_[lev];i++)
359    {
360      sentRequest=new SPendingRequest ;
361      sentRequest->buffer[0]=timeLine ;
362      sentRequest->buffer[1]=contextHashId ;
363      sentRequest->buffer[2]=schedulerLevel ;
364      sentRequest->buffer[3]=lev+1 ;
365      MPI_Isend(sentRequest->buffer,4, MPI_UNSIGNED_LONG, child_[lev][i], 1, communicator_, & sentRequest->request) ;
366      pendingSentChildRequest_.push_back(sentRequest) ;
367    }
368  }
369   
370
371}
Note: See TracBrowser for help on using the repository browser.