source: XIOS/dev/dev_olga/src/event_scheduler.cpp @ 1314

Last change on this file since 1314 was 1158, checked in by oabramkina, 7 years ago

Two server levels: merging with trunk r1137.
There are bugs.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 6.5 KB
Line 
1#include "event_scheduler.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4
5namespace xios
6{
7 
8 
9  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
10  {
11    MPI_Comm_dup(comm, &communicator) ;
12    MPI_Comm_size(communicator,&mpiSize) ;
13    MPI_Comm_rank(communicator,&mpiRank);
14
15
16    int maxChild=1 ;
17
18    int m ;
19    do
20    {
21      m=1 ;
22      maxChild=maxChild+1 ;
23      for(int i=0;i<maxChild;i++) m=m*maxChild ;
24     } while(m<mpiSize) ;
25   
26   
27    int maxLevel=0 ;
28    for(int size=1; size<=mpiSize; size*=maxChild) maxLevel++ ; 
29
30    int begin, end, nb ;
31    int pos, n ;
32 
33    parent=vector<int>(maxLevel+1) ;
34    child=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
35    nbChild=vector<int> (maxLevel+1) ;
36   
37    level=0 ;
38    begin=0 ;
39    end=mpiSize-1 ;     
40    nb=end-begin+1 ;
41     
42    do
43    {
44      n=0 ;
45      pos=begin ;
46      nbChild[level]=0 ;
47      parent[level+1]=begin ;
48      for(int i=0;i<maxChild && i<nb ;i++)
49      {
50        if (i<nb%maxChild) n = nb/maxChild + 1 ;
51        else n = nb/maxChild ;
52     
53        if (mpiRank>=pos && mpiRank<pos+n)
54        {
55          begin=pos ;
56          end=pos+n-1 ;
57        }
58        child[level][i]=pos ;
59        pos=pos+n ;
60        nbChild[level]++ ;
61      } 
62      nb=end-begin+1 ;
63      level=level+1 ;
64    } while (nb>1) ;
65
66   
67  }
68
69  CEventScheduler::~CEventScheduler()
70  {
71
72  } 
73
74  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
75  {
76    registerEvent(timeLine, contextHashId, level) ;
77  }
78 
79  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
80  {
81       
82    SPendingRequest* sentRequest=new SPendingRequest ;
83    sentRequest->buffer[0]=timeLine ;
84    sentRequest->buffer[1]=contextHashId ;
85    sentRequest->buffer[2]=lev-1 ;
86
87    pendingSentParentRequest.push(sentRequest) ;
88    MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, parent[lev], 0, communicator, &sentRequest->request) ;
89  } 
90
91  bool CEventScheduler::queryEvent(const size_t timeLine, const size_t contextHashId)
92  {
93
94    if (! eventStack.empty() && eventStack.front().first==timeLine && eventStack.front().second==contextHashId)
95    {
96      eventStack.pop() ;
97      return true ;
98    }
99    else return false ; 
100  } 
101 
102  void CEventScheduler::checkEvent(void)
103  {
104    checkChildRequest() ;
105    checkParentRequest() ;
106   
107  }
108 
109  void CEventScheduler::checkParentRequest(void)
110  {
111    int completed ;
112    MPI_Status status ;
113    int received ;
114    SPendingRequest* recvRequest ;
115    completed=true ;
116   
117    // check sent request to parent
118    while (! pendingSentParentRequest.empty() && completed)
119    {
120      MPI_Test( & pendingSentParentRequest.front()->request, &completed, &status) ;
121      if (completed) 
122      {
123        delete pendingSentParentRequest.front() ;
124        pendingSentParentRequest.pop() ;
125      }
126    }
127   
128    // probe if a message is coming from parent
129    received=true ;
130    while(received)
131    {
132      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator,&received, &status) ;
133      if (received)
134      {
135        recvRequest=new SPendingRequest ;
136        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator, &(recvRequest->request)) ;
137        pendingRecvParentRequest.push(recvRequest) ;
138      }
139    }
140   
141     // check sent request from parent
142    completed=true ;
143    while (! pendingRecvParentRequest.empty() && completed)
144    {
145      recvRequest=pendingRecvParentRequest.front() ;
146      MPI_Test( &(recvRequest->request), &completed, &status) ;
147      if (completed) 
148      {
149        size_t timeLine=recvRequest->buffer[0] ;
150        size_t hashId=recvRequest->buffer[1] ;
151        size_t lev=recvRequest->buffer[2] ;
152        delete recvRequest ;
153        pendingRecvParentRequest.pop() ;       
154 
155        if (lev==level) eventStack.push(pair<size_t,size_t>(timeLine,hashId)) ;
156        else  bcastEvent(timeLine, hashId, lev) ;
157      }
158    }   
159   
160  }
161
162  void CEventScheduler::checkChildRequest(void)
163  {
164// function call only by parent mpi process
165
166    MPI_Status status ; 
167    int received ;
168    received=true ;
169    SPendingRequest* recvRequest ;
170   
171    // check for posted requests and make the corresponding receive
172    while(received)
173    {
174      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator,&received, &status) ;
175      if (received)
176      {
177        recvRequest=new SPendingRequest ;
178        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator, &recvRequest->request) ;
179        pendingRecvChildRequest.push_back(recvRequest) ;
180      }
181    }
182   
183    // check if receive request is achieved
184   
185    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest.begin(); it!=pendingRecvChildRequest.end() ; )
186    {
187      MPI_Test(&((*it)->request),&received,&status) ;
188      if (received)
189      {
190        size_t timeLine=(*it)->buffer[0] ;
191        size_t hashId=(*it)->buffer[1] ;
192        size_t lev=(*it)->buffer[2] ;
193       
194        SEvent event={timeLine,hashId,lev} ;
195        delete *it ; // free mem
196        it=pendingRecvChildRequest.erase(it) ; // get out of the list
197       
198        map< SEvent,int>::iterator itEvent=recvEvent.find(event) ;
199        if (itEvent==recvEvent.end()) 
200        {
201          itEvent=(recvEvent.insert(pair< SEvent ,int > (event,1))).first ;
202 
203        }
204        else (itEvent->second)++ ;
205        if (itEvent->second==nbChild[lev])
206        {
207          if (lev==0)
208          {
209            bcastEvent(timeLine,hashId,lev) ;
210            recvEvent.erase(itEvent) ;
211          }
212          else
213          {
214            registerEvent( timeLine,hashId,lev) ;
215          }
216        }
217      }
218      else ++it ;
219    }
220   
221    // check if bcast request is achieved
222
223    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest.begin(); it!=pendingSentChildRequest.end() ; )
224    {
225      MPI_Test(&(*it)->request,&received,&status) ;
226      if (received)
227      {
228        delete *it ;    // free memory
229        it = pendingSentChildRequest.erase(it) ;          // get out of the list
230
231      }
232      else ++it ;
233       
234    }
235  }
236 
237  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
238  {
239    SPendingRequest* sentRequest ;
240     
241   
242    for(int i=0; i<nbChild[lev];i++)
243    {
244      sentRequest=new SPendingRequest ;
245      sentRequest->buffer[0]=timeLine ;
246      sentRequest->buffer[1]=contextHashId ;
247      sentRequest->buffer[2]=lev+1 ;
248      MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, child[lev][i], 1, communicator, & sentRequest->request) ;
249      pendingSentChildRequest.push_back(sentRequest) ;
250    }
251  }
252   
253
254}
Note: See TracBrowser for help on using the repository browser.