source: XIOS/dev/dev_olga/src/context_server.cpp @ 1193

Last change on this file since 1193 was 1193, checked in by oabramkina, 7 years ago

Two server levels: fixing a bug during context finalization. The last buffer check for the connection from server (classical or primary) to client must be blocking. Otherwise it's possible to have lost messages sent by server to client causing a deadlock on the side of client at context finalization.

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 8.2 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include <boost/functional/hash.hpp>
19
20
21
22namespace xios
23{
24
25  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
26  {
27    context=parent;
28    intraComm=intraComm_;
29    MPI_Comm_size(intraComm,&intraCommSize);
30    MPI_Comm_rank(intraComm,&intraCommRank);
31
32    interComm=interComm_;
33    int flag;
34    MPI_Comm_test_inter(interComm,&flag);
35    if (flag) MPI_Comm_remote_size(interComm,&commSize);
36    else  MPI_Comm_size(interComm,&commSize);
37
38    currentTimeLine=0;
39    scheduled=false;
40    finished=false;
41    boost::hash<string> hashString;
42    if (CServer::serverLevel == 1)
43      hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size()));
44    else
45      hashId=hashString(context->getId());
46  }
47
48  void CContextServer::setPendingEvent(void)
49  {
50    pendingEvent=true;
51  }
52
53  bool CContextServer::hasPendingEvent(void)
54  {
55    return pendingEvent;
56  }
57
58  bool CContextServer::hasFinished(void)
59  {
60    return finished;
61  }
62
63  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
64  {
65    listen();
66    checkPendingRequest();
67    if (enableEventsProcessing)
68      processEvents();
69    return finished;
70  }
71
72  void CContextServer::listen(void)
73  {
74    int rank;
75    int flag;
76    int count;
77    char * addr;
78    MPI_Status status;
79    map<int,CServerBuffer*>::iterator it;
80
81    for(rank=0;rank<commSize;rank++)
82    {
83      if (pendingRequest.find(rank)==pendingRequest.end())
84      {
85        traceOff();
86        MPI_Iprobe(rank,20,interComm,&flag,&status);
87        traceOn();
88        if (flag==true)
89        {
90          it=buffers.find(rank);
91          if (it==buffers.end()) // Receive the buffer size and allocate the buffer
92          {
93            StdSize buffSize = 0;
94            MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status);
95            mapBufferSize_.insert(std::make_pair(rank, buffSize));
96            it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first;
97          }
98          else
99          {
100            MPI_Get_count(&status,MPI_CHAR,&count);
101            if (it->second->isBufferFree(count))
102            {
103              addr=(char*)it->second->getBuffer(count);
104              MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
105              bufferRequest[rank]=addr;
106            }
107          }
108        }
109      }
110    }
111  }
112
113  void CContextServer::checkPendingRequest(void)
114  {
115    map<int,MPI_Request>::iterator it;
116    list<int> recvRequest;
117    list<int>::iterator itRecv;
118    int rank;
119    int flag;
120    int count;
121    MPI_Status status;
122
123    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
124    {
125      rank=it->first;
126      traceOff();
127      MPI_Test(& it->second, &flag, &status);
128      traceOn();
129      if (flag==true)
130      {
131        recvRequest.push_back(rank);
132        MPI_Get_count(&status,MPI_CHAR,&count);
133        processRequest(rank,bufferRequest[rank],count);
134      }
135    }
136
137    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
138    {
139      pendingRequest.erase(*itRecv);
140      bufferRequest.erase(*itRecv);
141    }
142  }
143
144  void CContextServer::processRequest(int rank, char* buff,int count)
145  {
146
147    CBufferIn buffer(buff,count);
148    char* startBuffer,endBuffer;
149    int size, offset;
150    size_t timeLine;
151    map<size_t,CEventServer*>::iterator it;
152
153    while(count>0)
154    {
155      char* startBuffer=(char*)buffer.ptr();
156      CBufferIn newBuffer(startBuffer,buffer.remain());
157      newBuffer>>size>>timeLine;
158
159      it=events.find(timeLine);
160      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first;
161      it->second->push(rank,buffers[rank],startBuffer,size);
162
163      buffer.advance(size);
164      count=buffer.remain();
165    }
166  }
167
168  void CContextServer::processEvents(void)
169  {
170    map<size_t,CEventServer*>::iterator it;
171    CEventServer* event;
172
173    it=events.find(currentTimeLine);
174    if (it!=events.end())
175    {
176      event=it->second;
177
178      if (event->isFull())
179      {
180        if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side
181        {
182          CServer::eventScheduler->registerEvent(currentTimeLine,hashId);
183          scheduled=true;
184        }
185        else if (!CServer::eventScheduler || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) )
186        {
187         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
188         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
189         // for now just set up a MPI barrier
190         if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ;
191
192         CTimer::get("Process events").resume();
193         dispatchEvent(*event);
194         CTimer::get("Process events").suspend();
195         pendingEvent=false;
196         delete event;
197         events.erase(it);
198         currentTimeLine++;
199         scheduled = false;
200        }
201      }
202    }
203  }
204
205  CContextServer::~CContextServer()
206  {
207    map<int,CServerBuffer*>::iterator it;
208    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
209  }
210
211  void CContextServer::dispatchEvent(CEventServer& event)
212  {
213    string contextName;
214    string buff;
215    int MsgSize;
216    int rank;
217    list<CEventServer::SSubEvent>::iterator it;
218    StdString ctxId = context->getId();
219    CContext::setCurrent(ctxId);
220    StdSize totalBuf = 0;
221
222    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
223    {
224      finished=true;
225      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
226                           iteMap = mapBufferSize_.end(), itMap;
227      for (itMap = itbMap; itMap != iteMap; ++itMap)
228      {
229        rank = itMap->first;
230        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
231            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
232        totalBuf += itMap->second;
233      }
234      context->finalize();
235      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
236    }
237    else if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_POST_FINALIZE)
238    {
239      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
240      context->postFinalize();
241    }
242    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
243    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
244    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
245    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
246    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
247    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
248    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
249    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
250    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
251    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
252    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
253    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
254    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
255    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
256    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
257    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
258    else
259    {
260      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
261    }
262  }
263}
Note: See TracBrowser for help on using the repository browser.