source: XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp @ 2246

Last change on this file since 2246 was 2246, checked in by ymipsl, 3 years ago
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 19.0 KB
Line 
1#include "context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CContextServer::CContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : eventScheduler_(nullptr), isProcessingEvent_(false), associatedClient_(nullptr)
35  {
36    context=parent;
37    intraComm=intraComm_;
38    MPI_Comm_size(intraComm,&intraCommSize);
39    MPI_Comm_rank(intraComm,&intraCommRank);
40
41    interComm=interComm_;
42    int flag;
43    MPI_Comm_test_inter(interComm,&flag);
44
45    if (flag) attachedMode=false ;
46    else  attachedMode=true ;
47   
48    int clientSize ;
49    if (flag) MPI_Comm_remote_size(interComm,&clientSize);
50    else  MPI_Comm_size(interComm,&clientSize);
51
52   
53    SRegisterContextInfo contextInfo ;
54    CXios::getContextsManager()->getContextInfo(context->getId(), contextInfo, intraComm) ;
55
56  //  if (contextInfo.serviceType != CServicesManager::CLIENT) // we must have an event scheduler => to be retrieve from the associated services
57  //  {
58      //if (!isAttachedModeEnabled()) eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
59    eventScheduler_=CXios::getPoolRessource()->getService(contextInfo.serviceId,contextInfo.partitionId)->getEventScheduler() ;
60    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
61  //  }
62
63
64    currentTimeLine=1;
65    scheduled=false;
66    finished=false;
67
68    // generate unique hash for server
69    auto time=chrono::system_clock::now().time_since_epoch().count() ;
70    std::default_random_engine rd(time); // not reproducible from a run to another
71    std::uniform_int_distribution<size_t> dist;
72    hashId=dist(rd) ;
73    MPI_Bcast(&hashId,1,MPI_SIZE_T,0,intraComm) ; // Bcast to all server of the context
74
75
76    if (!isAttachedModeEnabled())
77    {
78      CTimer::get("create Windows").resume() ;
79
80      MPI_Intercomm_merge(interComm_,true,&interCommMerged) ;
81
82      // We create dummy pair of intercommunicator between clients and server
83      // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically
84      // We don't know the reason
85      MPI_Comm commSelf ;
86      MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ;
87      vector<MPI_Comm> dummyComm(clientSize) ;
88      for(int rank=0; rank<clientSize ; rank++) MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &dummyComm[rank]) ;
89
90      // create windows for one sided comm
91      MPI_Comm winComm ;
92      windows.resize(2) ;
93      for(int rank=clientSize; rank<clientSize+intraCommSize; rank++)
94      {
95        if (rank==clientSize+intraCommRank) 
96        {
97          MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm);
98          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[0]);
99          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm, &windows[1]);   
100        }
101        else MPI_Comm_split(interCommMerged, intraCommRank, rank, &winComm);
102        //       ym : Warning : intelMPI doesn't support that communicator of windows be deallocated before the windows deallocation, crash at MPI_Win_lock
103        //            Bug or not ?         
104        //         MPI_Comm_free(&winComm) ;
105      }
106     
107      // free dummy intercommunicator
108      for(int rank=0; rank<clientSize ; rank++)  MPI_Comm_free(&dummyComm[rank]) ;
109      MPI_Comm_free(&commSelf) ;
110      CTimer::get("create Windows").suspend() ;
111    }
112    else 
113    {
114      windows.resize(2) ;
115      windows[0]=MPI_WIN_NULL ;
116      windows[1]=MPI_WIN_NULL ;
117    }
118   
119    itLastTimeLine=lastTimeLine.begin() ;
120
121    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
122    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
123     
124  }
125
126//! Attached mode is used ?
127//! \return true if attached mode is used, false otherwise
128  bool CContextServer::isAttachedModeEnabled() const
129  {
130    return attachedMode ;
131  }
132 
133  void CContextServer::setPendingEvent(void)
134  {
135    pendingEvent=true;
136  }
137
138  bool CContextServer::hasPendingEvent(void)
139  {
140    return pendingEvent;
141  }
142
143  bool CContextServer::hasFinished(void)
144  {
145    return finished;
146  }
147
148  bool CContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
149  {
150    CTimer::get("listen request").resume();
151    listen();
152    CTimer::get("listen request").suspend();
153    CTimer::get("check pending request").resume();
154    checkPendingRequest();
155    checkPendingProbe() ;
156    CTimer::get("check pending request").suspend();
157    CTimer::get("check event process").resume();
158    if (enableEventsProcessing)  processEvents();
159    CTimer::get("check event process").suspend();
160    return finished;
161  }
162/*
163  void CContextServer::listen(void)
164  {
165    int rank;
166    int flag;
167    int count;
168    char * addr;
169    MPI_Status status;
170    map<int,CServerBuffer*>::iterator it;
171    bool okLoop;
172
173    traceOff();
174    // WARNING : with intel MPI, probing crash on an intercommunicator with release library but not with release_mt
175    // ==>  source $I_MPI_ROOT/intel64/bin/mpivars.sh release_mt    needed
176    MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm,&flag,&status);
177    traceOn();
178
179    if (flag==true)
180    {
181      rank=status.MPI_SOURCE ;
182      okLoop = true;
183      if (pendingRequest.find(rank)==pendingRequest.end())
184        okLoop = !listenPendingRequest(status) ;
185      if (okLoop)
186      {
187        for(rank=0;rank<commSize;rank++)
188        {
189          if (pendingRequest.find(rank)==pendingRequest.end())
190          {
191
192            traceOff();
193            MPI_Iprobe(rank, 20,interComm,&flag,&status);
194            traceOn();
195            if (flag==true) listenPendingRequest(status) ;
196          }
197        }
198      }
199    }
200  }
201
202  bool CContextServer::listenPendingRequest(MPI_Status& status)
203  {
204    int count;
205    char * addr;
206    map<int,CServerBuffer*>::iterator it;
207    int rank=status.MPI_SOURCE ;
208
209    it=buffers.find(rank);
210    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
211    {
212       MPI_Aint recvBuff[4] ;
213       MPI_Recv(recvBuff, 4, MPI_AINT, rank, 20, interComm, &status);
214       remoteHashId_ = recvBuff[0] ;
215       StdSize buffSize = recvBuff[1];
216       vector<MPI_Aint> winAdress(2) ;
217       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
218       mapBufferSize_.insert(std::make_pair(rank, buffSize));
219       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;
220     
221       lastTimeLine[rank]=0 ;
222       itLastTimeLine=lastTimeLine.begin() ;
223
224       return true;
225    }
226    else
227    {
228      MPI_Get_count(&status,MPI_CHAR,&count);
229      if (it->second->isBufferFree(count))
230      {
231         addr=(char*)it->second->getBuffer(count);
232         MPI_Irecv(addr,count,MPI_CHAR,rank,20,interComm,&pendingRequest[rank]);
233         bufferRequest[rank]=addr;
234         return true;
235       }
236      else
237        return false;
238    }
239  }
240*/
241
242 void CContextServer::listen(void)
243  {
244    int rank;
245    int flag;
246    int count;
247    char * addr;
248    MPI_Status status;
249    MPI_Message message ;
250    map<int,CServerBuffer*>::iterator it;
251    bool okLoop;
252
253    traceOff();
254    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
255    traceOn();
256    if (flag==true) listenPendingRequest(message, status) ;
257  }
258
259  bool CContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
260  {
261    int count;
262    char * addr;
263    map<int,CServerBuffer*>::iterator it;
264    int rank=status.MPI_SOURCE ;
265
266    it=buffers.find(rank);
267    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
268    {
269       MPI_Aint recvBuff[4] ;
270       MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
271       remoteHashId_ = recvBuff[0] ;
272       StdSize buffSize = recvBuff[1];
273       vector<MPI_Aint> winAdress(2) ;
274       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
275       mapBufferSize_.insert(std::make_pair(rank, buffSize));
276       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, buffSize)))).first;
277       lastTimeLine[rank]=0 ;
278       itLastTimeLine=lastTimeLine.begin() ;
279       return true;
280    }
281    else
282    {
283        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
284        pendingProbe[rank].push_back(mypair) ;
285        return false;
286    }
287  }
288
289  void CContextServer::checkPendingProbe(void)
290  {
291   
292    list<int> recvProbe ;
293    list<int>::iterator itRecv ;
294    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
295
296    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
297    {
298      int rank=itProbe->first ;
299      if (pendingRequest.count(rank)==0)
300      {
301        MPI_Message& message = itProbe->second.front().first ;
302        MPI_Status& status = itProbe->second.front().second ;
303        int count ;
304        MPI_Get_count(&status,MPI_CHAR,&count);
305        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
306        if (it->second->isBufferFree(count))
307        {
308          char * addr;
309          addr=(char*)it->second->getBuffer(count);
310          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
311          bufferRequest[rank]=addr;
312          recvProbe.push_back(rank) ;
313          itProbe->second.pop_front() ;
314        }
315      }
316    }
317
318    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
319  }
320
321
322  void CContextServer::checkPendingRequest(void)
323  {
324    map<int,MPI_Request>::iterator it;
325    list<int> recvRequest;
326    list<int>::iterator itRecv;
327    int rank;
328    int flag;
329    int count;
330    MPI_Status status;
331   
332    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
333    else CTimer::get("receiving requests").suspend();
334
335    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
336    {
337      rank=it->first;
338      traceOff();
339      MPI_Test(& it->second, &flag, &status);
340      traceOn();
341      if (flag==true)
342      {
343        buffers[rank]->updateCurrentWindows() ;
344        recvRequest.push_back(rank);
345        MPI_Get_count(&status,MPI_CHAR,&count);
346        processRequest(rank,bufferRequest[rank],count);
347      }
348    }
349
350    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
351    {
352      pendingRequest.erase(*itRecv);
353      bufferRequest.erase(*itRecv);
354    }
355  }
356
357  void CContextServer::getBufferFromClient(size_t timeLine)
358  {
359    CTimer::get("CContextServer::getBufferFromClient").resume() ;
360    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
361    { 
362      int rank ;
363      char *buffer ;
364      size_t count ; 
365
366      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
367      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
368      {
369        rank=itLastTimeLine->first ;
370        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
371        {
372          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
373          if (count >= 0) break ;
374        }
375      }
376    }
377    CTimer::get("CContextServer::getBufferFromClient").suspend() ;
378  }
379         
380       
381  void CContextServer::processRequest(int rank, char* buff,int count)
382  {
383
384    CBufferIn buffer(buff,count);
385    char* startBuffer,endBuffer;
386    int size, offset;
387    size_t timeLine=0;
388    map<size_t,CEventServer*>::iterator it;
389
390   
391    CTimer::get("Process request").resume();
392    while(count>0)
393    {
394      char* startBuffer=(char*)buffer.ptr();
395      CBufferIn newBuffer(startBuffer,buffer.remain());
396      newBuffer>>size>>timeLine;
397
398      if (timeLine==timelineEventNotifyChangeBufferSize)
399      {
400        buffers[rank]->notifyBufferResizing() ;
401        buffers[rank]->updateCurrentWindows() ;
402      } 
403      else if (timeLine==timelineEventChangeBufferSize)
404      {
405        size_t newSize ;
406        vector<MPI_Aint> winAdress(2) ;
407        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
408        buffers.erase(rank) ;
409        buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows, winAdress, rank, newSize)));
410      }
411      else
412      {
413        it=events.find(timeLine);
414        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
415        it->second->push(rank,buffers[rank],startBuffer,size);
416        if (timeLine>0) lastTimeLine[rank]=timeLine ;
417      }
418      buffer.advance(size);
419      count=buffer.remain();
420    }
421   
422    CTimer::get("Process request").suspend();
423  }
424
425  void CContextServer::processEvents(void)
426  {
427    map<size_t,CEventServer*>::iterator it;
428    CEventServer* event;
429   
430//    if (context->isProcessingEvent()) return ;
431    if (isProcessingEvent_) return ;
432    if (isAttachedModeEnabled())
433      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
434
435    it=events.find(currentTimeLine);
436    if (it!=events.end())
437    {
438      event=it->second;
439
440      if (event->isFull())
441      {
442        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
443        {
444          eventScheduler_->registerEvent(currentTimeLine,hashId);
445          scheduled=true;
446        }
447        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
448        {
449
450          if (!eventScheduled_) 
451          {
452            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
453            eventScheduled_=true ;
454            return ;
455          }
456          else 
457          {
458            MPI_Status status ;
459            int flag ;
460            MPI_Test(&processEventRequest_, &flag, &status) ;
461            if (!flag) return ;
462            eventScheduled_=false ;
463          }
464
465          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
466          //MPI_Barrier(intraComm) ;
467         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
468         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
469         // for now just set up a MPI barrier
470//ym to be check later
471//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
472
473//         context->setProcessingEvent() ;
474         isProcessingEvent_=true ;
475         CTimer::get("Process events").resume();
476         info(100)<<"Received Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
477         dispatchEvent(*event);
478         CTimer::get("Process events").suspend();
479         isProcessingEvent_=false ;
480//         context->unsetProcessingEvent() ;
481         pendingEvent=false;
482         delete event;
483         events.erase(it);
484         currentTimeLine++;
485         scheduled = false;
486         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
487        }
488      }
489      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
490    }
491    else if (pureOneSided) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
492  }
493
494  CContextServer::~CContextServer()
495  {
496    map<int,CServerBuffer*>::iterator it;
497    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
498  }
499
500  void CContextServer::releaseBuffers()
501  {
502    map<int,CServerBuffer*>::iterator it;
503    bool out ;
504    do
505    {
506      out=true ;
507      for(it=buffers.begin();it!=buffers.end();++it)
508      {
509//        out = out && it->second->freeWindows() ;
510
511      }
512    } while (! out) ; 
513      MPI_Win_free(&windows[0]) ;
514      MPI_Win_free(&windows[1]) ;
515  }
516
517  void CContextServer::notifyClientsFinalize(void)
518  {
519    for(auto it=buffers.begin();it!=buffers.end();++it)
520    {
521      it->second->notifyClientFinalize() ;
522    }
523  }
524
525  void CContextServer::dispatchEvent(CEventServer& event)
526  {
527    string contextName;
528    string buff;
529    int MsgSize;
530    int rank;
531    list<CEventServer::SSubEvent>::iterator it;
532    StdString ctxId = context->getId();
533    CContext::setCurrent(ctxId);
534    StdSize totalBuf = 0;
535
536    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
537    {
538      finished=true;
539      info(20)<<" CContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
540//      releaseBuffers() ;
541      notifyClientsFinalize() ;
542      CTimer::get("receiving requests").suspend();
543      context->finalize();
544
545// don't know where release windows
546      MPI_Win_free(&windows[0]) ;
547      MPI_Win_free(&windows[1]) ;
548
549      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
550                           iteMap = mapBufferSize_.end(), itMap;
551      for (itMap = itbMap; itMap != iteMap; ++itMap)
552      {
553        rank = itMap->first;
554        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
555            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
556        totalBuf += itMap->second;
557      }
558      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
559    }
560    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
561    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
562    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
563    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
564    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
565    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
566    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
567    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
568    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
569    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
570    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
571    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
572    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
573    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
574    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
575    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
576    else
577    {
578      ERROR("void CContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
579    }
580  }
581
582  bool CContextServer::isCollectiveEvent(CEventServer& event)
583  {
584    if (event.classId==CField::GetType()) return CField::isCollectiveEvent(event);
585    else return true ;
586  }
587}
Note: See TracBrowser for help on using the repository browser.