source: XIOS/trunk/src/context_server.cpp @ 492

Last change on this file since 492 was 492, checked in by ymipsl, 10 years ago

Add event scheduler functionnality in order to schedule events from different context, that cause Deadlock or crash when using collective MPI communication in netcdf/hdf5 library.

YM

  • Property svn:eol-style set to native
File size: 6.4 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include <boost/functional/hash.hpp>
19
20
21
22namespace xios
23{
24
25  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
26  {
27    context=parent ;
28    intraComm=intraComm_ ;
29    MPI_Comm_size(intraComm,&intraCommSize) ;
30    MPI_Comm_rank(intraComm,&intraCommRank) ;
31    interComm=interComm_ ;
32    int flag ;
33    MPI_Comm_test_inter(interComm,&flag) ;
34    if (flag) MPI_Comm_remote_size(interComm,&commSize);
35    else  MPI_Comm_size(interComm,&commSize) ;
36    currentTimeLine=0 ;
37    scheduled=false ;
38    finished=false ;
39   
40    boost::hash<string> hashString ;
41    hashId=hashString(context->getId()) ;
42
43  }
44  void CContextServer::setPendingEvent(void)
45  {
46    pendingEvent=true ;
47  }
48
49  bool CContextServer::hasPendingEvent(void)
50  {
51    return pendingEvent ;
52  }
53
54  bool CContextServer::eventLoop(void)
55  {
56    listen() ;
57    checkPendingRequest() ;
58    processEvents() ;
59    return finished ;
60  }
61
62  void CContextServer::listen(void)
63  {
64    int rank;
65    int flag ;
66    int count ;
67    char * addr ;
68    MPI_Status status;
69    map<int,CServerBuffer*>::iterator it;
70
71    for(rank=0;rank<commSize;rank++)
72    {
73      if (pendingRequest.find(rank)==pendingRequest.end())
74      {
75        traceOff() ;
76        MPI_Iprobe(rank,20,interComm,&flag,&status);
77        traceOn() ;
78        if (flag==true)
79        {
80          it=buffers.find(rank) ;
81          if (it==buffers.end())
82            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer))).first ;
83          MPI_Get_count(&status,MPI_CHAR,&count) ;
84          if (it->second->isBufferFree(count))
85          {
86            addr=(char*)it->second->getBuffer(count) ;
87            MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]) ;
88            bufferRequest[rank]=addr ;
89          }
90        }
91      }
92    }
93  }
94
95  void CContextServer::checkPendingRequest(void)
96  {
97    map<int,MPI_Request>::iterator it;
98    list<int> recvRequest ;
99    list<int>::iterator itRecv;
100    int rank ;
101    int flag ;
102    int count ;
103    MPI_Status status ;
104
105    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
106    {
107      rank=it->first ;
108      traceOff() ;
109      MPI_Test(& it->second, &flag, &status) ;
110      traceOn() ;
111      if (flag==true)
112      {
113        recvRequest.push_back(rank) ;
114        MPI_Get_count(&status,MPI_CHAR,&count) ;
115        processRequest(rank,bufferRequest[rank],count) ;
116      }
117    }
118
119    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
120    {
121      pendingRequest.erase(*itRecv) ;
122      bufferRequest.erase(*itRecv) ;
123    }
124  }
125
126  void CContextServer::processRequest(int rank, char* buff,int count)
127  {
128
129    CBufferIn buffer(buff,count) ;
130    char* startBuffer,endBuffer ;
131    int size, offset ;
132    size_t timeLine ;
133    map<size_t,CEventServer*>::iterator it ;
134
135    while(count>0)
136    {
137      char* startBuffer=(char*)buffer.ptr() ;
138      CBufferIn newBuffer(startBuffer,buffer.remain()) ;
139      newBuffer>>size>>timeLine ;
140
141      it=events.find(timeLine) ;
142      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first ;
143      it->second->push(rank,buffers[rank],startBuffer,size) ;
144
145      buffer.advance(size) ;
146      count=buffer.remain() ;
147    }
148
149  }
150
151  void CContextServer::processEvents(void)
152  {
153    map<size_t,CEventServer*>::iterator it ;
154    CEventServer* event ;
155
156    it=events.find(currentTimeLine) ;
157    if (it!=events.end())
158    {
159      event=it->second ;
160     
161      if (event->isFull())
162      {
163        if (!scheduled && !CXios::isServer)
164        {
165          CServer::eventScheduler->registerEvent(currentTimeLine,hashId) ; 
166          scheduled=true ;
167        }
168        else if (CXios::isServer || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) ) 
169        {
170         CTimer::get("Process events").resume() ;
171         dispatchEvent(*event) ;
172         CTimer::get("Process events").suspend() ;
173         pendingEvent=false ;
174         delete event ;
175         events.erase(it) ;
176         currentTimeLine++ ;
177         scheduled = false ;
178        }
179      }
180    }
181  }
182
183  CContextServer::~CContextServer()
184  {
185    map<int,CServerBuffer*>::iterator it ;
186    for(it=buffers.begin();it!=buffers.end();++it) delete it->second ;
187  }
188
189
190  void CContextServer::dispatchEvent(CEventServer& event)
191  {
192    string contextName ;
193    string buff ;
194    int MsgSize ;
195    int rank ;
196    list<CEventServer::SSubEvent>::iterator it ;
197    CContext::setCurrent(context->getId()) ;
198
199    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
200    {
201      info(20)<<"Server Side context <"<<context->getId()<<"> finalized"<<endl ;
202      context->finalize() ;
203      finished=true ;
204      report(0)<< " Memory report : Context <"<<context->getId()<<"> : server side : total memory used for buffer "<<buffers.size()*CXios::bufferSize<<" bytes"<<endl ;
205    }
206    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event) ;
207    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event) ;
208    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event) ;
209    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event) ;
210    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event) ;
211    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event) ;
212    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event) ;
213    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event) ;
214    else if (event.classId==CField::GetType()) CField::dispatchEvent(event) ;
215    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event) ;
216    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event) ;
217    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event) ;
218    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event) ;
219    else
220    {
221      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl) ;
222    }
223  }
224}
Note: See TracBrowser for help on using the repository browser.