source: XIOS/dev/dev_olga/src/event_scheduler.cpp @ 1148

Last change on this file since 1148 was 992, checked in by oabramkina, 7 years ago

First rebond on the secondary server pool. XIOS finalizes correctly.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 6.5 KB
Line 
1#include "event_scheduler.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4
5namespace xios
6{
7 
8 
9  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
10  {
11    MPI_Comm_dup(comm, &communicator) ;
12    MPI_Comm_size(communicator,&mpiSize) ;
13    MPI_Comm_rank(communicator,&mpiRank);
14
15
16    int maxChild=1 ;
17
18    int m ;
19    do
20    {
21      m=1 ;
22      maxChild=maxChild+1 ;
23      for(int i=0;i<maxChild;i++) m=m*maxChild ;
24     } while(m<mpiSize) ;
25   
26   
27    int maxLevel=0 ;
28    for(int size=1; size<=mpiSize; size*=maxChild) maxLevel++ ; 
29
30    int begin, end, nb ;
31    int pos, n ;
32 
33    parent=vector<int>(maxLevel+1) ;
34    child=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
35    nbChild=vector<int> (maxLevel+1) ;
36   
37    level=0 ;
38    begin=0 ;
39    end=mpiSize-1 ;     
40    nb=end-begin+1 ;
41     
42    do
43    {
44      n=0 ;
45      pos=begin ;
46      nbChild[level]=0 ;
47      parent[level+1]=begin ;
48      for(int i=0;i<maxChild && i<nb ;i++)
49      {
50        if (i<nb%maxChild) n = nb/maxChild + 1 ;
51        else n = nb/maxChild ;
52     
53        if (mpiRank>=pos && mpiRank<pos+n)
54        {
55          begin=pos ;
56          end=pos+n-1 ;
57        }
58        child[level][i]=pos ;
59        pos=pos+n ;
60        nbChild[level]++ ;
61      } 
62      nb=end-begin+1 ;
63      level=level+1 ;
64    } while (nb>1) ;
65
66   
67  }
68
69  CEventScheduler::~CEventScheduler()
70  {
71
72  } 
73
74  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
75  {
76    registerEvent(timeLine, contextHashId, level) ;
77  }
78 
79  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
80  {
81       
82    SPendingRequest* sentRequest=new SPendingRequest ;
83    sentRequest->buffer[0]=timeLine ;
84    sentRequest->buffer[1]=contextHashId ;
85    sentRequest->buffer[2]=lev-1 ;
86
87    pendingSentParentRequest.push(sentRequest) ;
88    MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, parent[lev], 0, communicator, &sentRequest->request) ;
89  } 
90
91  bool CEventScheduler::queryEvent(const size_t timeLine, const size_t contextHashId)
92  {
93
94    if (! eventStack.empty() && eventStack.front().first==timeLine && eventStack.front().second==contextHashId)
95    {
96      eventStack.pop() ;
97      return true ;
98    }
99    else return false ; 
100  } 
101 
102  void CEventScheduler::checkEvent(void)
103  {
104    checkChildRequest() ;
105    checkParentRequest() ;
106   
107  }
108 
109  void CEventScheduler::checkParentRequest(void)
110  {
111    int completed ;
112    MPI_Status status ;
113    int received ;
114    SPendingRequest* recvRequest ;
115    completed=true ;
116   
117    // check sent request to parent
118    while (! pendingSentParentRequest.empty() && completed)
119    {
120      MPI_Test( & pendingSentParentRequest.front()->request, &completed, &status) ;
121      if (completed) 
122      {
123        delete pendingSentParentRequest.front() ;
124        pendingSentParentRequest.pop() ;
125      }
126    }
127   
128    // probe if a message is coming from parent
129    received=true ;
130    while(received)
131    {
132      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator,&received, &status) ;
133      if (received)
134      {
135        recvRequest=new SPendingRequest ;
136        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator, &(recvRequest->request)) ;
137        pendingRecvParentRequest.push(recvRequest) ;
138      }
139    }
140   
141     // check sent request from parent
142    completed=true ;
143    while (! pendingRecvParentRequest.empty() && completed)
144    {
145      recvRequest=pendingRecvParentRequest.front() ;
146      MPI_Test( &(recvRequest->request), &completed, &status) ;
147      if (completed) 
148      {
149        size_t timeLine=recvRequest->buffer[0] ;
150        size_t hashId=recvRequest->buffer[1] ;
151        size_t lev=recvRequest->buffer[2] ;
152//        delete recvRequest ;
153        pendingRecvParentRequest.pop() ;       
154 
155        if (lev==level) eventStack.push(pair<size_t,size_t>(timeLine,hashId)) ;
156        else  bcastEvent(timeLine, hashId, lev) ;
157        delete recvRequest ;
158      }
159    }   
160   
161  }
162
163  void CEventScheduler::checkChildRequest(void)
164  {
165// function call only by parent mpi process
166
167    MPI_Status status ; 
168    int received ;
169    received=true ;
170    SPendingRequest* recvRequest ;
171   
172    // check for posted requests and make the corresponding receive
173    while(received)
174    {
175      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator,&received, &status) ;
176      if (received)
177      {
178        recvRequest=new SPendingRequest ;
179        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator, &recvRequest->request) ;
180        pendingRecvChildRequest.push_back(recvRequest) ;
181      }
182    }
183   
184    // check if receive request is achieved
185   
186    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest.begin(); it!=pendingRecvChildRequest.end() ; )
187    {
188      MPI_Test(&((*it)->request),&received,&status) ;
189      if (received)
190      {
191        size_t timeLine=(*it)->buffer[0] ;
192        size_t hashId=(*it)->buffer[1] ;
193        size_t lev=(*it)->buffer[2] ;
194       
195        SEvent event={timeLine,hashId,lev} ;
196        delete *it ; // free mem
197        it=pendingRecvChildRequest.erase(it) ; // get out of the list
198       
199        map< SEvent,int>::iterator itEvent=recvEvent.find(event) ;
200        if (itEvent==recvEvent.end()) 
201        {
202          itEvent=(recvEvent.insert(pair< SEvent ,int > (event,1))).first ;
203 
204        }
205        else (itEvent->second)++ ;
206        if (itEvent->second==nbChild[lev])
207        {
208          if (lev==0)
209          {
210            bcastEvent(timeLine,hashId,lev) ;
211            recvEvent.erase(itEvent) ;
212          }
213          else
214          {
215            registerEvent( timeLine,hashId,lev) ;
216          }
217        }
218      }
219      else ++it ;
220    }
221   
222    // check if bcast request is achieved
223
224    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest.begin(); it!=pendingSentChildRequest.end() ; )
225    {
226      MPI_Test(&(*it)->request,&received,&status) ;
227      if (received)
228      {
229        delete *it ;    // free memory
230        it = pendingSentChildRequest.erase(it) ;          // get out of the list
231
232      }
233      else ++it ;
234       
235    }
236  }
237 
238  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
239  {
240    SPendingRequest* sentRequest ;
241     
242   
243    for(int i=0; i<nbChild[lev];i++)
244    {
245      sentRequest=new SPendingRequest ;
246      sentRequest->buffer[0]=timeLine ;
247      sentRequest->buffer[1]=contextHashId ;
248      sentRequest->buffer[2]=lev+1 ;
249      MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, child[lev][i], 1, communicator, & sentRequest->request) ;
250      pendingSentChildRequest.push_back(sentRequest) ;
251    }
252  }
253   
254
255}
Note: See TracBrowser for help on using the repository browser.