source: XIOS/trunk/src/event_scheduler.cpp @ 492

Last change on this file since 492 was 492, checked in by ymipsl, 7 years ago

Add event scheduler functionnality in order to schedule events from different context, that cause Deadlock or crash when using collective MPI communication in netcdf/hdf5 library.

YM

  • Property svn:eol-style set to native
File size: 6.5 KB
Line 
1#include "event_scheduler.hpp"
2#include "xmlioserver_spl.hpp"
3#include "mpi.hpp"
4
5namespace xios
6{
7 
8 
9  CEventScheduler::CEventScheduler(const MPI_Comm& comm) 
10  {
11    MPI_Comm_dup(comm, &communicator) ;
12    MPI_Comm_size(communicator,&mpiSize) ;
13    MPI_Comm_rank(communicator,&mpiRank);
14
15
16    int maxChild=1 ;
17
18    int m ;
19    do
20    {
21      m=1 ;
22      maxChild=maxChild+1 ;
23      for(int i=0;i<maxChild;i++) m=m*maxChild ;
24     } while(m<mpiSize) ;
25   
26   
27    int maxLevel=0 ;
28    for(int size=1; size<=mpiSize; size*=maxChild) maxLevel++ ; 
29
30    int begin, end, nb ;
31    int pos, n ;
32 
33    parent=vector<int>(maxLevel+1) ;
34    child=vector<vector<int> >(maxLevel+1,vector<int>(maxChild)) ;
35    nbChild=vector<int> (maxLevel+1) ;
36   
37    level=0 ;
38    begin=0 ;
39    end=mpiSize-1 ;     
40    nb=end-begin+1 ;
41     
42    do
43    {
44      n=0 ;
45      pos=begin ;
46      nbChild[level]=0 ;
47      parent[level+1]=begin ;
48      for(int i=0;i<maxChild && i<nb ;i++)
49      {
50        if (i<nb%maxChild) n = nb/maxChild + 1 ;
51        else n = nb/maxChild ;
52     
53        if (mpiRank>=pos && mpiRank<pos+n)
54        {
55          begin=pos ;
56          end=pos+n-1 ;
57        }
58        child[level][i]=pos ;
59        pos=pos+n ;
60        nbChild[level]++ ;
61      } 
62      nb=end-begin+1 ;
63      level=level+1 ;
64    } while (nb>1) ;
65
66   
67  }
68
69  CEventScheduler::~CEventScheduler()
70  {
71
72  } 
73
74  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId)
75  {
76    registerEvent(timeLine, contextHashId, level) ;
77  }
78 
79  void CEventScheduler::registerEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
80  {
81       
82    SPendingRequest* sentRequest=new SPendingRequest ;
83    sentRequest->buffer[0]=timeLine ;
84    sentRequest->buffer[1]=contextHashId ;
85    sentRequest->buffer[2]=lev-1 ;
86
87    pendingSentParentRequest.push(sentRequest) ;
88    MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, parent[lev], 0, communicator, &sentRequest->request) ;
89  } 
90
91  bool CEventScheduler::queryEvent(const size_t timeLine, const size_t contextHashId)
92  {
93
94    if (! eventStack.empty() && eventStack.front().first==timeLine && eventStack.front().second==contextHashId)
95    {
96      eventStack.pop() ;
97      return true ;
98    }
99    else return false ; 
100  } 
101 
102  void CEventScheduler::checkEvent(void)
103  {
104    checkChildRequest() ;
105    checkParentRequest() ;
106   
107  }
108 
109 
110 
111  void CEventScheduler::checkParentRequest(void)
112  {
113    int completed ;
114    MPI_Status status ;
115    int received ;
116    SPendingRequest* recvRequest ;
117    completed=true ;
118   
119    // check sent request to parent
120    while (! pendingSentParentRequest.empty() && completed)
121    {
122      MPI_Test( & pendingSentParentRequest.front()->request, &completed, &status) ;
123      if (completed) 
124      {
125        delete pendingSentParentRequest.front() ;
126        pendingSentParentRequest.pop() ;
127      }
128    }
129   
130    // probe if a message is coming from parent
131    received=true ;
132    while(received)
133    {
134      MPI_Iprobe(MPI_ANY_SOURCE,1,communicator,&received, &status) ;
135      if (received)
136      {
137        recvRequest=new SPendingRequest ;
138        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 1, communicator, &(recvRequest->request)) ;
139        pendingRecvParentRequest.push(recvRequest) ;
140      }
141    }
142   
143     // check sent request from parent
144    completed=true ;
145    while (! pendingRecvParentRequest.empty() && completed)
146    {
147      recvRequest=pendingRecvParentRequest.front() ;
148      MPI_Test( &(recvRequest->request), &completed, &status) ;
149      if (completed) 
150      {
151        size_t timeLine=recvRequest->buffer[0] ;
152        size_t hashId=recvRequest->buffer[1] ;
153        size_t lev=recvRequest->buffer[2] ;
154        delete recvRequest ;
155        pendingRecvParentRequest.pop() ;       
156 
157        if (lev==level) eventStack.push(pair<size_t,size_t>(timeLine,hashId)) ;
158        else  bcastEvent(timeLine, hashId, lev) ;
159      }
160    }   
161   
162  }
163
164  void CEventScheduler::checkChildRequest(void)
165  {
166// function call only by parent mpi process
167
168    MPI_Status status ; 
169    int received ;
170    received=true ;
171    SPendingRequest* recvRequest ;
172   
173    // check for posted requests and make the corresponding receive
174    while(received)
175    {
176      MPI_Iprobe(MPI_ANY_SOURCE,0,communicator,&received, &status) ;
177      if (received)
178      {
179        recvRequest=new SPendingRequest ;
180        MPI_Irecv(recvRequest->buffer, 3, MPI_UNSIGNED_LONG, MPI_ANY_SOURCE, 0, communicator, &recvRequest->request) ;
181        pendingRecvChildRequest.push_back(recvRequest) ;
182      }
183    }
184   
185    // check if receive request is achieved
186   
187    for(list<SPendingRequest*>::iterator it=pendingRecvChildRequest.begin(); it!=pendingRecvChildRequest.end() ; )
188    {
189      MPI_Test(&((*it)->request),&received,&status) ;
190      if (received)
191      {
192        size_t timeLine=(*it)->buffer[0] ;
193        size_t hashId=(*it)->buffer[1] ;
194        size_t lev=(*it)->buffer[2] ;
195       
196        SEvent event={timeLine,hashId,lev} ;
197        delete *it ; // free mem
198        it=pendingRecvChildRequest.erase(it) ; // get out of the list
199       
200        map< SEvent,int>::iterator itEvent=recvEvent.find(event) ;
201        if (itEvent==recvEvent.end()) 
202        {
203          itEvent=(recvEvent.insert(pair< SEvent ,int > (event,1))).first ;
204 
205        }
206        else (itEvent->second)++ ;
207        if (itEvent->second==nbChild[lev])
208        {
209          if (lev==0)
210          {
211            bcastEvent(timeLine,hashId,lev) ;
212            recvEvent.erase(itEvent) ;
213          }
214          else
215          {
216            registerEvent( timeLine,hashId,lev) ;
217          }
218        }
219      }
220      else ++it ;
221    }
222   
223    // check if bcast request is achieved
224
225    for(list<SPendingRequest*>::iterator it=pendingSentChildRequest.begin(); it!=pendingSentChildRequest.end() ; )
226    {
227      MPI_Test(&(*it)->request,&received,&status) ;
228      if (received)
229      {
230        delete *it ;    // free memory
231        it = pendingSentChildRequest.erase(it) ;          // get out of the list
232
233      }
234      else ++it ;
235       
236    }
237  }
238 
239  void CEventScheduler::bcastEvent(const size_t timeLine, const size_t contextHashId, const size_t lev)
240  {
241    SPendingRequest* sentRequest ;
242     
243   
244    for(int i=0; i<nbChild[lev];i++)
245    {
246      sentRequest=new SPendingRequest ;
247      sentRequest->buffer[0]=timeLine ;
248      sentRequest->buffer[1]=contextHashId ;
249      sentRequest->buffer[2]=lev+1 ;
250      MPI_Isend(sentRequest->buffer,3, MPI_UNSIGNED_LONG, child[lev][i], 1, communicator, & sentRequest->request) ;
251      pendingSentChildRequest.push_back(sentRequest) ;
252    }
253  }
254   
255
256}
Note: See TracBrowser for help on using the repository browser.