source: XIOS/dev/dev_ym/XIOS_COUPLING/src/transport/legacy_context_server.cpp @ 2343

Last change on this file since 2343 was 2343, checked in by ymipsl, 22 months ago
  • Implement new infrastructure for transfert protocol.
  • new purelly one sided protocol is now available, the previous protocol (legacy, mix send/recv and one sided) is still available. Other specific protocol could be implemented more easilly in future.
  • switch can be operate with "transport_protocol" variable in XIOS context :

ex:
<variable id="transport_protocol" type="string">one_sided</variable>

Available protocols are : one_sided, legacy or default. The default protocol is "legacy".

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.9 KB
Line 
1#include "legacy_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CLegacyContextServer::CLegacyContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : CContextServer(parent, intraComm_, interComm_),
35      isProcessingEvent_(false)
36  {
37 
38    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39 
40
41    currentTimeLine=1;
42    scheduled=false;
43    finished=false;
44
45    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
46    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
47   
48    itLastTimeLine=lastTimeLine.begin() ;
49
50    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
51    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
52     
53  }
54 
55  void CLegacyContextServer::setPendingEvent(void)
56  {
57    pendingEvent=true;
58  }
59
60  bool CLegacyContextServer::hasPendingEvent(void)
61  {
62    return pendingEvent;
63  }
64
65  bool CLegacyContextServer::hasFinished(void)
66  {
67    return finished;
68  }
69
70  bool CLegacyContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
71  {
72    CTimer::get("listen request").resume();
73    listen();
74    CTimer::get("listen request").suspend();
75    CTimer::get("check pending request").resume();
76    checkPendingRequest();
77    checkPendingProbe() ;
78    CTimer::get("check pending request").suspend();
79    CTimer::get("check event process").resume();
80    processEvents(enableEventsProcessing);
81    CTimer::get("check event process").suspend();
82    return finished;
83  }
84
85 void CLegacyContextServer::listen(void)
86  {
87    int rank;
88    int flag;
89    int count;
90    char * addr;
91    MPI_Status status;
92    MPI_Message message ;
93    map<int,CServerBuffer*>::iterator it;
94    bool okLoop;
95
96    traceOff();
97    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
98    traceOn();
99    if (flag==true) listenPendingRequest(message, status) ;
100  }
101
102  bool CLegacyContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
103  {
104    int count;
105    char * addr;
106    map<int,CServerBuffer*>::iterator it;
107    int rank=status.MPI_SOURCE ;
108
109    it=buffers.find(rank);
110    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
111    {
112      MPI_Aint recvBuff[4] ;
113      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
114      remoteHashId_ = recvBuff[0] ;
115      StdSize buffSize = recvBuff[1];
116      vector<MPI_Aint> winAdress(2) ;
117      winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
118      mapBufferSize_.insert(std::make_pair(rank, buffSize));
119
120      // create windows dynamically for one-sided
121      if (!isAttachedModeEnabled())
122      { 
123        CTimer::get("create Windows").resume() ;
124        MPI_Comm interComm ;
125        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ;
126        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
127        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
128        MPI_Comm_free(&interComm) ;
129        windows_[rank].resize(2) ;
130        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
131        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
132        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
133        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
134        CTimer::get("create Windows").suspend() ;
135        MPI_Barrier(winComm_[rank]) ;
136      }
137      else
138      {
139        winComm_[rank] = MPI_COMM_NULL ;
140        windows_[rank].resize(2) ;
141        windows_[rank][0] = MPI_WIN_NULL ;
142        windows_[rank][1] = MPI_WIN_NULL ;
143      }   
144
145      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first;
146      lastTimeLine[rank]=0 ;
147      itLastTimeLine=lastTimeLine.begin() ;
148
149      return true;
150    }
151    else
152    {
153        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
154        pendingProbe[rank].push_back(mypair) ;
155        return false;
156    }
157  }
158
159  void CLegacyContextServer::checkPendingProbe(void)
160  {
161   
162    list<int> recvProbe ;
163    list<int>::iterator itRecv ;
164    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
165
166    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
167    {
168      int rank=itProbe->first ;
169      if (pendingRequest.count(rank)==0)
170      {
171        MPI_Message& message = itProbe->second.front().first ;
172        MPI_Status& status = itProbe->second.front().second ;
173        int count ;
174        MPI_Get_count(&status,MPI_CHAR,&count);
175        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
176        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
177          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
178        {
179          char * addr;
180          addr=(char*)it->second->getBuffer(count);
181          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
182          bufferRequest[rank]=addr;
183          recvProbe.push_back(rank) ;
184          itProbe->second.pop_front() ;
185        }
186      }
187    }
188
189    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
190  }
191
192
193  void CLegacyContextServer::checkPendingRequest(void)
194  {
195    map<int,MPI_Request>::iterator it;
196    list<int> recvRequest;
197    list<int>::iterator itRecv;
198    int rank;
199    int flag;
200    int count;
201    MPI_Status status;
202   
203    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
204    else CTimer::get("receiving requests").suspend();
205
206    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
207    {
208      rank=it->first;
209      traceOff();
210      MPI_Test(& it->second, &flag, &status);
211      traceOn();
212      if (flag==true)
213      {
214        buffers[rank]->updateCurrentWindows() ;
215        recvRequest.push_back(rank);
216        MPI_Get_count(&status,MPI_CHAR,&count);
217        processRequest(rank,bufferRequest[rank],count);
218      }
219    }
220
221    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
222    {
223      pendingRequest.erase(*itRecv);
224      bufferRequest.erase(*itRecv);
225    }
226  }
227
228  void CLegacyContextServer::getBufferFromClient(size_t timeLine)
229  {
230    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ;
231    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
232    { 
233      int rank ;
234      char *buffer ;
235      size_t count ; 
236
237      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
238      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
239      {
240        rank=itLastTimeLine->first ;
241        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
242        {
243          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
244          if (count >= 0) ++itLastTimeLine ;
245          break ;
246        }
247      }
248    }
249    CTimer::get("CLegacyContextServer::getBufferFromClient").suspend() ;
250  }
251         
252       
253  void CLegacyContextServer::processRequest(int rank, char* buff,int count)
254  {
255
256    CBufferIn buffer(buff,count);
257    char* startBuffer,endBuffer;
258    int size, offset;
259    size_t timeLine=0;
260    map<size_t,CEventServer*>::iterator it;
261
262   
263    CTimer::get("Process request").resume();
264    while(count>0)
265    {
266      char* startBuffer=(char*)buffer.ptr();
267      CBufferIn newBuffer(startBuffer,buffer.remain());
268      newBuffer>>size>>timeLine;
269
270      if (timeLine==timelineEventNotifyChangeBufferSize)
271      {
272        buffers[rank]->notifyBufferResizing() ;
273        buffers[rank]->updateCurrentWindows() ;
274        buffers[rank]->popBuffer(count) ;
275        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
276                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
277      } 
278      else if (timeLine==timelineEventChangeBufferSize)
279      {
280        size_t newSize ;
281        vector<MPI_Aint> winAdress(2) ;
282        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
283        buffers[rank]->freeBuffer(count) ;
284        delete buffers[rank] ;
285        buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, 2*newSize) ;
286        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
287                 <<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ;
288      }
289      else
290      {
291        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
292        it=events.find(timeLine);
293       
294        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
295        it->second->push(rank,buffers[rank],startBuffer,size);
296        if (timeLine>0) lastTimeLine[rank]=timeLine ;
297      }
298      buffer.advance(size);
299      count=buffer.remain();
300    }
301   
302    CTimer::get("Process request").suspend();
303  }
304
305  void CLegacyContextServer::processEvents(bool enableEventsProcessing)
306  {
307    map<size_t,CEventServer*>::iterator it;
308    CEventServer* event;
309   
310//    if (context->isProcessingEvent()) return ;
311    if (isProcessingEvent_) return ;
312    if (isAttachedModeEnabled())
313      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
314
315    it=events.find(currentTimeLine);
316    if (it!=events.end())
317    {
318      event=it->second;
319
320      if (event->isFull())
321      {
322        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
323        {
324          eventScheduler_->registerEvent(currentTimeLine,hashId);
325          scheduled=true;
326        }
327        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
328        {
329          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
330
331          if (!eventScheduled_) 
332          {
333            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
334            eventScheduled_=true ;
335            return ;
336          }
337          else 
338          {
339            MPI_Status status ;
340            int flag ;
341            MPI_Test(&processEventRequest_, &flag, &status) ;
342            if (!flag) return ;
343            eventScheduled_=false ;
344          }
345
346          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
347          //MPI_Barrier(intraComm) ;
348         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
349         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
350         // for now just set up a MPI barrier
351//ym to be check later
352//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
353
354//         context->setProcessingEvent() ;
355         isProcessingEvent_=true ;
356         CTimer::get("Process events").resume();
357         info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
358         dispatchEvent(*event);
359         CTimer::get("Process events").suspend();
360         isProcessingEvent_=false ;
361//         context->unsetProcessingEvent() ;
362         pendingEvent=false;
363         delete event;
364         events.erase(it);
365         currentTimeLine++;
366         scheduled = false;
367         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
368        }
369      }
370      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
371    }
372    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
373  }
374
375  CLegacyContextServer::~CLegacyContextServer()
376  {
377    map<int,CServerBuffer*>::iterator it;
378    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
379    buffers.clear() ;
380  }
381
382  void CLegacyContextServer::releaseBuffers()
383  {
384    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
385    //buffers.clear() ;
386    freeWindows() ;
387  }
388
389  void CLegacyContextServer::freeWindows()
390  {
391    //if (!isAttachedModeEnabled())
392    //{
393    //  for(auto& it : winComm_)
394    //  {
395    //    int rank = it.first ;
396    //    MPI_Win_free(&windows_[rank][0]);
397    //    MPI_Win_free(&windows_[rank][1]);
398    //    MPI_Comm_free(&winComm_[rank]) ;
399    //  }
400    //}
401  }
402
403  void CLegacyContextServer::notifyClientsFinalize(void)
404  {
405    for(auto it=buffers.begin();it!=buffers.end();++it)
406    {
407      it->second->notifyClientFinalize() ;
408    }
409  }
410
411  void CLegacyContextServer::dispatchEvent(CEventServer& event)
412  {
413    string contextName;
414    string buff;
415    int MsgSize;
416    int rank;
417    list<CEventServer::SSubEvent>::iterator it;
418    StdString ctxId = context->getId();
419    CContext::setCurrent(ctxId);
420    StdSize totalBuf = 0;
421
422    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
423    {
424      finished=true;
425      info(20)<<" CLegacyContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
426      notifyClientsFinalize() ;
427      CTimer::get("receiving requests").suspend();
428      context->finalize();
429     
430      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
431                           iteMap = mapBufferSize_.end(), itMap;
432      for (itMap = itbMap; itMap != iteMap; ++itMap)
433      {
434        rank = itMap->first;
435        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
436            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
437        totalBuf += itMap->second;
438      }
439      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
440    }
441    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
442    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
443    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
444    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
445    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
446    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
447    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
448    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
449    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
450    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
451    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
452    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
453    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
454    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
455    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
456    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
457    else
458    {
459      ERROR("void CLegacyContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
460    }
461  }
462
463  bool CLegacyContextServer::isCollectiveEvent(CEventServer& event)
464  {
465    if (event.type>1000) return false ;
466    else return true ;
467  }
468}
Note: See TracBrowser for help on using the repository browser.