source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 1853

Last change on this file since 1853 was 1853, checked in by ymipsl, 4 years ago

Coupling branch : replace hasServer and hasClient combination by the name of correct service : CLIENT, GATHERER or OUT_SERVER.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.1 KB
RevLine 
[300]1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
[352]5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
[300]8#include "domain.hpp"
[352]9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
[382]12#include "mpi.hpp"
[347]13#include "tracer.hpp"
14#include "timer.hpp"
[401]15#include "cxios.hpp"
[492]16#include "event_scheduler.hpp"
17#include "server.hpp"
[1761]18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22
[492]23#include <boost/functional/hash.hpp>
[1761]24#include <random>
25#include <chrono>
[300]26
27
[335]28namespace xios
[300]29{
[1761]30  using namespace std ;
[300]31
[1853]32  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
33    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
[300]34  {
[549]35    context=parent;
36    intraComm=intraComm_;
[1639]37    MPI_Comm_size(intraComm,&intraCommSize);
38    MPI_Comm_rank(intraComm,&intraCommRank);
[1054]39
[549]40    interComm=interComm_;
41    int flag;
[1639]42    MPI_Comm_test_inter(interComm,&flag);
[1757]43
44    if (flag) attachedMode=false ;
45    else  attachedMode=true ;
46   
[1639]47    if (flag) MPI_Comm_remote_size(interComm,&commSize);
48    else  MPI_Comm_size(interComm,&commSize);
[983]49
[1761]50   
51    SRegisterContextInfo contextInfo ;
52    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
53
54    if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
55    {
[1764]56      if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
[1761]57    }
58
59
[1757]60    currentTimeLine=1;
[549]61    scheduled=false;
62    finished=false;
[1761]63
64    // generate unique hash for server
65    auto time=chrono::system_clock::now().time_since_epoch().count() ;
66    std::default_random_engine rd(time); // not reproducible from a run to another
67    std::uniform_int_distribution<size_t> dist;
68    hashId=dist(rd) ;
69    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
70
71
[1757]72    if (!isAttachedModeEnabled())
73    {
74      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ;
75// create windows for one sided comm
76      int interCommMergedRank;
77      MPI_Comm winComm ;
78      MPI_Comm_rank(intraComm, &interCommMergedRank);
79      windows.resize(2) ;
80      for(int rank=commSize; rank<commSize+intraCommSize; rank++)
81      {
82        if (rank==commSize+interCommMergedRank) 
83        {
84          MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
85          int myRank ;
86          MPI_Comm_rank(winComm,&myRank);
87          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]);
88          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);     
89        }
90        else MPI_Comm_split(interCommMerged, interCommMergedRank, rank, &winComm);
91        MPI_Comm_free(&winComm) ;
92      }
93    }
94    else 
95    {
96      windows.resize(2) ;
97      windows[0]=MPI_WIN_NULL ;
98      windows[1]=MPI_WIN_NULL ;
99    }
100
101
102   
103    MPI_Comm_split(intraComm_,intraCommRank,intraCommRank, &commSelf) ;
104    itLastTimeLine=lastTimeLine.begin() ;
105
106    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
107    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
108     
[300]109  }
[992]110
[1757]111//! Attached mode is used ?
112//! \return true if attached mode is used, false otherwise
113  bool CContextServer::isAttachedModeEnabled() const
114  {
115    return attachedMode ;
116  }
117 
[300]118  void CContextServer::setPendingEvent(void)
119  {
[549]120    pendingEvent=true;
[300]121  }
[489]122
[300]123  bool CContextServer::hasPendingEvent(void)
124  {
[549]125    return pendingEvent;
[300]126  }
[489]127
[597]128  bool CContextServer::hasFinished(void)
129  {
130    return finished;
131  }
132
[1054]133  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
[300]134  {
[549]135    listen();
136    checkPendingRequest();
[1757]137    if (enableEventsProcessing)  processEvents();
[549]138    return finished;
[300]139  }
140
141  void CContextServer::listen(void)
142  {
143    int rank;
[549]144    int flag;
145    int count;
146    char * addr;
[1639]147    MPI_Status status;
[300]148    map<int,CServerBuffer*>::iterator it;
[1230]149    bool okLoop;
[489]150
[1225]151    traceOff();
152    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
153    traceOn();
154
155    if (flag==true)
[300]156    {
[1225]157      rank=status.MPI_SOURCE ;
[1230]158      okLoop = true;
[1228]159      if (pendingRequest.find(rank)==pendingRequest.end())
160        okLoop = !listenPendingRequest(status) ;
161      if (okLoop)
[300]162      {
[1225]163        for(rank=0;rank<commSize;rank++)
[300]164        {
[1225]165          if (pendingRequest.find(rank)==pendingRequest.end())
[300]166          {
[1225]167
168            traceOff();
[1639]169            MPI_Iprobe(rank, 20,interComm,&flag,&status);
[1225]170            traceOn();
171            if (flag==true) listenPendingRequest(status) ;
[300]172          }
173        }
174      }
175    }
176  }
[489]177
[1639]178  bool CContextServer::listenPendingRequest(MPI_Status& status)
[1225]179  {
180    int count;
181    char * addr;
182    map<int,CServerBuffer*>::iterator it;
183    int rank=status.MPI_SOURCE ;
184
185    it=buffers.find(rank);
186    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
187    {
[1757]188       MPI_Aint recvBuff[3] ;
189       MPI_Recv(recvBuff, 3, MPI_AINT, rank, 20, interComm, &status);
190       StdSize buffSize = recvBuff[0];
191       vector<MPI_Aint> winAdress(2) ;
192       winAdress[0]=recvBuff[1] ; winAdress[1]=recvBuff[2] ;
[1225]193       mapBufferSize_.insert(std::make_pair(rank, buffSize));
[1757]194       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;
[1765]195     
[1757]196       lastTimeLine[rank]=0 ;
197       itLastTimeLine=lastTimeLine.begin() ;
198
[1228]199       return true;
[1225]200    }
201    else
202    {
[1639]203      MPI_Get_count(&status,MPI_CHAR,&count);
[1225]204      if (it->second->isBufferFree(count))
205      {
206         addr=(char*)it->second->getBuffer(count);
[1639]207         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
[1225]208         bufferRequest[rank]=addr;
[1228]209         return true;
[1225]210       }
[1228]211      else
212        return false;
[1225]213    }
214  }
215
216
[300]217  void CContextServer::checkPendingRequest(void)
218  {
[1639]219    map<int,MPI_Request>::iterator it;
[549]220    list<int> recvRequest;
[300]221    list<int>::iterator itRecv;
[549]222    int rank;
223    int flag;
224    int count;
[1639]225    MPI_Status status;
[489]226
[300]227    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
228    {
[549]229      rank=it->first;
230      traceOff();
[1639]231      MPI_Test(& it->second, &flag, &status);
[549]232      traceOn();
[300]233      if (flag==true)
234      {
[1757]235        buffers[rank]->updateCurrentWindows() ;
[549]236        recvRequest.push_back(rank);
[1639]237        MPI_Get_count(&status,MPI_CHAR,&count);
[549]238        processRequest(rank,bufferRequest[rank],count);
[300]239      }
240    }
[489]241
242    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
[300]243    {
[549]244      pendingRequest.erase(*itRecv);
245      bufferRequest.erase(*itRecv);
[300]246    }
247  }
[489]248
[1757]249  void CContextServer::getBufferFromClient(size_t timeLine)
250  {
251    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
252    { 
253      int rank ;
254      char *buffer ;
255      size_t count ; 
256
257      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
258      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
259      {
260        rank=itLastTimeLine->first ;
261        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0)
262        {
263          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count))
264          {
265            processRequest(rank, buffer, count);
266            break ;
267          }
268        }
269      }
270    }
271  }
272         
273       
[300]274  void CContextServer::processRequest(int rank, char* buff,int count)
275  {
[489]276
[549]277    CBufferIn buffer(buff,count);
278    char* startBuffer,endBuffer;
279    int size, offset;
[1757]280    size_t timeLine=0;
[549]281    map<size_t,CEventServer*>::iterator it;
[489]282
[1757]283   
[1225]284    CTimer::get("Process request").resume();
[300]285    while(count>0)
286    {
[549]287      char* startBuffer=(char*)buffer.ptr();
288      CBufferIn newBuffer(startBuffer,buffer.remain());
289      newBuffer>>size>>timeLine;
290      it=events.find(timeLine);
[1853]291      if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
[549]292      it->second->push(rank,buffers[rank],startBuffer,size);
[300]293
[549]294      buffer.advance(size);
295      count=buffer.remain();
[489]296    }
[1757]297
298    if (timeLine>0) lastTimeLine[rank]=timeLine ;
299   
[1225]300    CTimer::get("Process request").suspend();
[300]301  }
[489]302
[300]303  void CContextServer::processEvents(void)
304  {
[549]305    map<size_t,CEventServer*>::iterator it;
306    CEventServer* event;
[1761]307   
[1764]308//    if (context->isProcessingEvent()) return ;
309    if (isProcessingEvent_) return ;
[489]310
[549]311    it=events.find(currentTimeLine);
[489]312    if (it!=events.end())
[300]313    {
[549]314      event=it->second;
[509]315
[300]316      if (event->isFull())
317      {
[1764]318        if (!scheduled && eventScheduler_) // Skip event scheduling for attached mode and reception on client side
[492]319        {
[1764]320          eventScheduler_->registerEvent(currentTimeLine,hashId);
[549]321          scheduled=true;
[492]322        }
[1764]323        else if (!eventScheduler_ || eventScheduler_->queryEvent(currentTimeLine,hashId) )
[492]324        {
[851]325         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
326         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
327         // for now just set up a MPI barrier
[1764]328         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
[851]329
[1764]330//         context->setProcessingEvent() ;
331         isProcessingEvent_=true ;
[549]332         CTimer::get("Process events").resume();
333         dispatchEvent(*event);
334         CTimer::get("Process events").suspend();
[1764]335         isProcessingEvent_=false ;
336//         context->unsetProcessingEvent() ;
[549]337         pendingEvent=false;
338         delete event;
339         events.erase(it);
340         currentTimeLine++;
341         scheduled = false;
[492]342        }
343      }
[1757]344      else getBufferFromClient(currentTimeLine) ;
[492]345    }
[1757]346    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
[492]347  }
[489]348
[300]349  CContextServer::~CContextServer()
350  {
[549]351    map<int,CServerBuffer*>::iterator it;
[1158]352    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
[489]353  }
[300]354
[1757]355  void CContextServer::releaseBuffers()
356  {
357    map<int,CServerBuffer*>::iterator it;
358    bool out ;
359    do
360    {
361      out=true ;
362      for(it=buffers.begin();it!=buffers.end();++it)
363      {
364//        out = out && it->second->freeWindows() ;
365
366      }
367    } while (! out) ; 
368  }
369
370  void CContextServer::notifyClientsFinalize(void)
371  {
372    for(auto it=buffers.begin();it!=buffers.end();++it)
373    {
374      it->second->notifyClientFinalize() ;
375    }
376  }
377
[300]378  void CContextServer::dispatchEvent(CEventServer& event)
379  {
[549]380    string contextName;
381    string buff;
382    int MsgSize;
383    int rank;
384    list<CEventServer::SSubEvent>::iterator it;
[1054]385    StdString ctxId = context->getId();
386    CContext::setCurrent(ctxId);
[1130]387    StdSize totalBuf = 0;
[489]388
[300]389    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
390    {
[597]391      finished=true;
[1194]392      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
[1757]393//      releaseBuffers() ;
394      notifyClientsFinalize() ;
[1194]395      context->finalize();
[1757]396
397/* don't know where release windows
398      MPI_Win_free(&windows[0]) ;
399      MPI_Win_free(&windows[1]) ;
400*/     
[511]401      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
[983]402                           iteMap = mapBufferSize_.end(), itMap;
[511]403      for (itMap = itbMap; itMap != iteMap; ++itMap)
404      {
[1054]405        rank = itMap->first;
[1130]406        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
407            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
408        totalBuf += itMap->second;
[511]409      }
[1130]410      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
[300]411    }
[549]412    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
413    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
414    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
415    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
416    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
417    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
418    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
[887]419    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
420    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
[549]421    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
422    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
423    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
424    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
425    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
426    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
427    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
[300]428    else
429    {
[549]430      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
[300]431    }
432  }
433}
Note: See TracBrowser for help on using the repository browser.