source: XIOS/trunk/src/event_scheduler.cpp @ 492

Last change on this file since 492 was 492, checked in by ymipsl, 10 years ago

Add event scheduler functionnality in order to schedule events from different context, that cause Deadlock or crash when using collective MPI communication in netcdf/hdf5 library.

YM

  • Property svn:eol-style set to native
File size: 6.5 KB
Line 
1#include "event_scheduler.hpp"
2#include "xmlioserver_spl.hpp"
3#include "mpi.hpp"
4
5namespace xios
6{
7 
8 
9  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
10  {
11    MPI_Comm_dup(comm, &communicator) ;
12    MPI_Comm_size(communicator,&mpiSize) ;
13    MPI_Comm_rank(communicator,&mpiRank);
14
15
16    int maxChild=1 ;
17
18    int m ;
19    do
20    {
21      m=1 ;
22      maxChild=maxChild+1 ;
23      for(int i=0;i<maxChild;i++) m=m*maxChild ;
24     } while(m<mpiSize) ;
25   
26   
27    int maxLevel=0 ;
28    for(int size=1; size<=mpiSize; size*=maxChild) maxLevel++ ; 
29
30    int begin, end, nb ;
31    int pos, n ;
32 
33    parent=vector<int>(maxLevel+1) ;
34    child=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
35    nbChild=vector<int> (maxLevel+1) ;
36   
37    level=0 ;
38    begin=0 ;
39    end=mpiSize-1 ;     
40    nb=end-begin+1 ;
41     
42    do
43    {
44      n=0 ;
45      pos=begin ;
46      nbChild[level]=0 ;
47      parent[level+1]=begin ;
48      for(int i=0;i<maxChild && i<nb ;i++)
49      {
50        if (i<nb%maxChild) n = nb/maxChild + 1 ;
51        else n = nb/maxChild ;
52     
53        if (mpiRank>=pos && mpiRank<pos+n)
54        {
55          begin=pos ;
56          end=pos+n-1 ;
57        }
58        child[level][i]=pos ;
59        pos=pos+n ;
60        nbChild[level]++ ;
61      } 
62      nb=end-begin+1 ;
63      level=level+1 ;
64    } while (nb>1) ;
65
66   
67  }
68
69  CEventScheduler::~CEventScheduler()
70  {
71
72  } 
73
74  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
75  {
76    registerEvent(timeLine, contextHashId, level) ;
77  }
78 
79  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
80  {
81       
82    SPendingRequest* sentRequest=new SPendingRequest ;
83    sentRequest->buffer[0]=timeLine ;
84    sentRequest->buffer[1]=contextHashId ;
85    sentRequest->buffer[2]=lev-1 ;
86
87    pendingSentParentRequest.push(sentRequest) ;
88    MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, parent[lev], 0, communicator, &sentRequest->request) ;
89  } 
90
91  bool CEventScheduler::queryEvent(const size_t timeLine, const size_t contextHashId)
92  {
93
94    if (! eventStack.empty() && eventStack.front().first==timeLine && eventStack.front().second==contextHashId)
95    {
96      eventStack.pop() ;
97      return true ;
98    }
99    else return false ; 
100  } 
101 
102  void CEventScheduler::checkEvent(void)
103  {
104    checkChildRequest() ;
105    checkParentRequest() ;
106   
107  }
108 
109 
110 
111  void CEventScheduler::checkParentRequest(void)
112  {
113    int completed ;
114    MPI_Status status ;
115    int received ;
116    SPendingRequest* recvRequest ;
117    completed=true ;
118   
119    // check sent request to parent
120    while (! pendingSentParentRequest.empty() && completed)
121    {
122      MPI_Test( & pendingSentParentRequest.front()->request, &completed, &status) ;
123      if (completed) 
124      {
125        delete pendingSentParentRequest.front() ;
126        pendingSentParentRequest.pop() ;
127      }
128    }
129   
130    // probe if a message is coming from parent
131    received=true ;
132    while(received)
133    {
134      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator,&received, &status) ;
135      if (received)
136      {
137        recvRequest=new SPendingRequest ;
138        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator, &(recvRequest->request)) ;
139        pendingRecvParentRequest.push(recvRequest) ;
140      }
141    }
142   
143     // check sent request from parent
144    completed=true ;
145    while (! pendingRecvParentRequest.empty() && completed)
146    {
147      recvRequest=pendingRecvParentRequest.front() ;
148      MPI_Test( &(recvRequest->request), &completed, &status) ;
149      if (completed) 
150      {
151        size_t timeLine=recvRequest->buffer[0] ;
152        size_t hashId=recvRequest->buffer[1] ;
153        size_t lev=recvRequest->buffer[2] ;
154        delete recvRequest ;
155        pendingRecvParentRequest.pop() ;       
156 
157        if (lev==level) eventStack.push(pair<size_t,size_t>(timeLine,hashId)) ;
158        else  bcastEvent(timeLine, hashId, lev) ;
159      }
160    }   
161   
162  }
163
164  void CEventScheduler::checkChildRequest(void)
165  {
166// function call only by parent mpi process
167
168    MPI_Status status ; 
169    int received ;
170    received=true ;
171    SPendingRequest* recvRequest ;
172   
173    // check for posted requests and make the corresponding receive
174    while(received)
175    {
176      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator,&received, &status) ;
177      if (received)
178      {
179        recvRequest=new SPendingRequest ;
180        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator, &recvRequest->request) ;
181        pendingRecvChildRequest.push_back(recvRequest) ;
182      }
183    }
184   
185    // check if receive request is achieved
186   
187    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest.begin(); it!=pendingRecvChildRequest.end() ; )
188    {
189      MPI_Test(&((*it)->request),&received,&status) ;
190      if (received)
191      {
192        size_t timeLine=(*it)->buffer[0] ;
193        size_t hashId=(*it)->buffer[1] ;
194        size_t lev=(*it)->buffer[2] ;
195       
196        SEvent event={timeLine,hashId,lev} ;
197        delete *it ; // free mem
198        it=pendingRecvChildRequest.erase(it) ; // get out of the list
199       
200        map< SEvent,int>::iterator itEvent=recvEvent.find(event) ;
201        if (itEvent==recvEvent.end()) 
202        {
203          itEvent=(recvEvent.insert(pair< SEvent ,int > (event,1))).first ;
204 
205        }
206        else (itEvent->second)++ ;
207        if (itEvent->second==nbChild[lev])
208        {
209          if (lev==0)
210          {
211            bcastEvent(timeLine,hashId,lev) ;
212            recvEvent.erase(itEvent) ;
213          }
214          else
215          {
216            registerEvent( timeLine,hashId,lev) ;
217          }
218        }
219      }
220      else ++it ;
221    }
222   
223    // check if bcast request is achieved
224
225    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest.begin(); it!=pendingSentChildRequest.end() ; )
226    {
227      MPI_Test(&(*it)->request,&received,&status) ;
228      if (received)
229      {
230        delete *it ;    // free memory
231        it = pendingSentChildRequest.erase(it) ;          // get out of the list
232
233      }
234      else ++it ;
235       
236    }
237  }
238 
239  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
240  {
241    SPendingRequest* sentRequest ;
242     
243   
244    for(int i=0; i<nbChild[lev];i++)
245    {
246      sentRequest=new SPendingRequest ;
247      sentRequest->buffer[0]=timeLine ;
248      sentRequest->buffer[1]=contextHashId ;
249      sentRequest->buffer[2]=lev+1 ;
250      MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, child[lev][i], 1, communicator, & sentRequest->request) ;
251      pendingSentChildRequest.push_back(sentRequest) ;
252    }
253  }
254   
255
256}
Note: See TracBrowser for help on using the repository browser.