source: XIOS/dev/dev_ym/XIOS_IMMEDIAT/src/context_server.cpp @ 2225

Last change on this file since 2225 was 2015, checked in by ymipsl, 3 years ago

ASSIM2K Branch :
Improve transfer protocol using mpi matching Probe / matching receive
YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 11.2 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include <boost/functional/hash.hpp>
19
20
21
22namespace xios
23{
24
25  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
26  {
27    context=parent;
28    intraComm=intraComm_;
29    MPI_Comm_size(intraComm,&intraCommSize);
30    MPI_Comm_rank(intraComm,&intraCommRank);
31
32    interComm=interComm_;
33    int flag;
34    MPI_Comm_test_inter(interComm,&flag);
35    if (flag) MPI_Comm_remote_size(interComm,&commSize);
36    else  MPI_Comm_size(interComm,&commSize);
37
38    currentTimeLine=0;
39    scheduled=false;
40    finished=false;
41    boost::hash<string> hashString;
42    if (CServer::serverLevel == 1)
43      hashId=hashString(context->getId() + boost::lexical_cast<string>(context->clientPrimServer.size()));
44    else
45      hashId=hashString(context->getId());
46  }
47
48  void CContextServer::setPendingEvent(void)
49  {
50    pendingEvent=true;
51  }
52
53  bool CContextServer::hasPendingEvent(void)
54  {
55    return pendingEvent;
56  }
57
58  bool CContextServer::hasFinished(void)
59  {
60    return finished;
61  }
62
63  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
64  {
65    listen();
66    checkPendingProbe();
67    checkPendingRequest();
68    if (enableEventsProcessing)
69      processEvents();
70    return finished;
71  }
72/*
73  void CContextServer::listen(void)
74  {
75    int rank;
76    int flag;
77    int count;
78    char * addr;
79    MPI_Status status;
80    map<int,CServerBuffer*>::iterator it;
81    bool okLoop;
82
83    traceOff();
84    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
85    traceOn();
86
87    if (flag==true)
88    {
89      rank=status.MPI_SOURCE ;
90      okLoop = true;
91      if (pendingRequest.find(rank)==pendingRequest.end())
92        okLoop = !listenPendingRequest(status) ;
93      if (okLoop)
94      {
95        for(rank=0;rank<commSize;rank++)
96        {
97          if (pendingRequest.find(rank)==pendingRequest.end())
98          {
99
100            traceOff();
101            MPI_Iprobe(rank, 20,interComm,&flag,&status);
102            traceOn();
103            if (flag==true) listenPendingRequest(status) ;
104          }
105        }
106      }
107    }
108  }
109
110  bool CContextServer::listenPendingRequest(MPI_Status& status)
111  {
112    int count;
113    char * addr;
114    map<int,CServerBuffer*>::iterator it;
115    int rank=status.MPI_SOURCE ;
116
117    it=buffers.find(rank);
118    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
119    {
120       StdSize buffSize = 0;
121       MPI_Recv(&buffSize, 1, MPI_LONG, rank, 20, interComm, &status);
122       mapBufferSize_.insert(std::make_pair(rank, buffSize));
123       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first;
124       return true;
125    }
126    else
127    {
128      MPI_Get_count(&status,MPI_CHAR,&count);
129      if (it->second->isBufferFree(count))
130      {
131         addr=(char*)it->second->getBuffer(count);
132         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
133         bufferRequest[rank]=addr;
134         return true;
135       }
136      else
137        return false;
138    }
139  }
140
141*/
142
143  void CContextServer::listen(void)
144  {
145    int rank;
146    int flag;
147    int count;
148    char * addr;
149    MPI_Status status;
150    MPI_Message message ;
151    map<int,CServerBuffer*>::iterator it;
152    bool okLoop;
153
154    traceOff();
155    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
156    traceOn();
157    if (flag==true) listenPendingRequest(message, status) ;
158  }
159
160  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
161  {
162    int count;
163    char * addr;
164    map<int,CServerBuffer*>::iterator it;
165    int rank=status.MPI_SOURCE ;
166
167    it=buffers.find(rank);
168    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
169    {
170       StdSize buffSize = 0;
171       MPI_Mrecv(&buffSize, 1, MPI_LONG, &message, &status);
172       mapBufferSize_.insert(std::make_pair(rank, buffSize));
173       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(buffSize)))).first;
174       return true;
175    }
176    else
177    {
178//      MPI_Get_count(&status,MPI_CHAR,&count);
179//      if (it->second->isBufferFree(count))
180//      {
181//        addr=(char*)it->second->getBuffer(count);
182//         MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
183//         bufferRequest[rank]=addr;
184//         return true;
185//       }
186//      else
187//      {
188        pendingProbe[rank].push_back(make_pair<MPI_Message,MPI_Status>(message,status)) ;
189        return false;
190//      }
191    }
192  }
193
194  void CContextServer::checkPendingProbe(void)
195  {
196   
197    list<int> recvProbe ;
198    list<int>::iterator itRecv ;
199    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
200
201    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
202    {
203      int rank=itProbe->first ;
204      if (pendingRequest.count(rank)==0)
205      {
206        MPI_Message& message = itProbe->second.front().first ;
207        MPI_Status& status = itProbe->second.front().second ;
208        int count ;
209        MPI_Get_count(&status,MPI_CHAR,&count);
210        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
211        if (it->second->isBufferFree(count))
212        {
213          char * addr;
214          addr=(char*)it->second->getBuffer(count);
215          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
216          bufferRequest[rank]=addr;
217          recvProbe.push_back(rank) ;
218          itProbe->second.pop_front() ;
219        }
220      }
221    }
222
223    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
224  }
225
226  void CContextServer::checkPendingRequest(void)
227  {
228    map<int,MPI_Request>::iterator it;
229    list<int> recvRequest;
230    list<int>::iterator itRecv;
231    int rank;
232    int flag;
233    int count;
234    MPI_Status status;
235
236    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
237    {
238      rank=it->first;
239      traceOff();
240      MPI_Test(& it->second, &flag, &status);
241      traceOn();
242      if (flag==true)
243      {
244        recvRequest.push_back(rank);
245        MPI_Get_count(&status,MPI_CHAR,&count);
246        processRequest(rank,bufferRequest[rank],count);
247      }
248    }
249
250    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
251    {
252      pendingRequest.erase(*itRecv);
253      bufferRequest.erase(*itRecv);
254    }
255  }
256
257  void CContextServer::processRequest(int rank, char* buff,int count)
258  {
259
260    CBufferIn buffer(buff,count);
261    char* startBuffer,endBuffer;
262    int size, offset;
263    size_t timeLine;
264    map<size_t,CEventServer*>::iterator it;
265
266    CTimer::get("Process request").resume();
267    while(count>0)
268    {
269      char* startBuffer=(char*)buffer.ptr();
270      CBufferIn newBuffer(startBuffer,buffer.remain());
271      newBuffer>>size>>timeLine;
272
273      it=events.find(timeLine);
274      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer)).first;
275      it->second->push(rank,buffers[rank],startBuffer,size);
276
277      buffer.advance(size);
278      count=buffer.remain();
279    }
280    CTimer::get("Process request").suspend();
281  }
282
283  void CContextServer::processEvents(void)
284  {
285    map<size_t,CEventServer*>::iterator it;
286    CEventServer* event;
287
288    it=events.find(currentTimeLine);
289    if (it!=events.end())
290    {
291      event=it->second;
292
293      if (event->isFull())
294      {
295        if (!scheduled && CServer::eventScheduler) // Skip event scheduling for attached mode and reception on client side
296        {
297          CServer::eventScheduler->registerEvent(currentTimeLine,hashId);
298          scheduled=true;
299        }
300        else if (!CServer::eventScheduler || CServer::eventScheduler->queryEvent(currentTimeLine,hashId) )
301        {
302         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
303         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
304         // for now just set up a MPI barrier
305         if (!CServer::eventScheduler && CXios::isServer) MPI_Barrier(intraComm) ;
306
307         CTimer::get("Process events").resume();
308         dispatchEvent(*event);
309         CTimer::get("Process events").suspend();
310         pendingEvent=false;
311         delete event;
312         events.erase(it);
313         currentTimeLine++;
314         scheduled = false;
315        }
316      }
317    }
318  }
319
320  CContextServer::~CContextServer()
321  {
322    map<int,CServerBuffer*>::iterator it;
323    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
324  }
325
326  void CContextServer::dispatchEvent(CEventServer& event)
327  {
328    string contextName;
329    string buff;
330    int MsgSize;
331    int rank;
332    list<CEventServer::SSubEvent>::iterator it;
333    StdString ctxId = context->getId();
334    CContext::setCurrent(ctxId);
335    StdSize totalBuf = 0;
336
337    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
338    {
339      finished=true;
340      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
341      context->finalize();
342      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
343                           iteMap = mapBufferSize_.end(), itMap;
344      for (itMap = itbMap; itMap != iteMap; ++itMap)
345      {
346        rank = itMap->first;
347        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
348            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
349        totalBuf += itMap->second;
350      }
351      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
352    }
353    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
354    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
355    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
356    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
357    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
358    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
359    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
360    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
361    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
362    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
363    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
364    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
365    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
366    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
367    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
368    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
369    else
370    {
371      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
372    }
373  }
374}
Note: See TracBrowser for help on using the repository browser.