source: XIOS3/branches/xios-3.0-beta/src/transport/legacy_context_server.cpp @ 2432

Last change on this file since 2432 was 2432, checked in by jderouillat, 17 months ago

Fix the evaluated buffer sizes for fields, and shrink the memory overbooking of buffers on servers (legacy)

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.9 KB
Line 
1#include "legacy_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CLegacyContextServer::CLegacyContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : CContextServer(parent, intraComm_, interComm_),
35      isProcessingEvent_(false)
36  {
37 
38    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39 
40
41    currentTimeLine=1;
42    scheduled=false;
43    finished=false;
44
45    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
46    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
47   
48    itLastTimeLine=lastTimeLine.begin() ;
49
50    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
51    if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode
52     
53  }
54 
55  void CLegacyContextServer::setPendingEvent(void)
56  {
57    pendingEvent=true;
58  }
59
60  bool CLegacyContextServer::hasPendingEvent(void)
61  {
62    return pendingEvent;
63  }
64
65  bool CLegacyContextServer::hasFinished(void)
66  {
67    return finished;
68  }
69
70  bool CLegacyContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
71  {
72    CTimer::get("listen request").resume();
73    listen();
74    CTimer::get("listen request").suspend();
75    CTimer::get("check pending request").resume();
76    checkPendingRequest();
77    checkPendingProbe() ;
78    CTimer::get("check pending request").suspend();
79    CTimer::get("check event process").resume();
80    processEvents(enableEventsProcessing);
81    CTimer::get("check event process").suspend();
82    return finished;
83  }
84
85 void CLegacyContextServer::listen(void)
86  {
87    int rank;
88    int flag;
89    int count;
90    char * addr;
91    MPI_Status status;
92    MPI_Message message ;
93    map<int,CServerBuffer*>::iterator it;
94    bool okLoop;
95
96    traceOff();
97    MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status);
98    traceOn();
99    if (flag==true) listenPendingRequest(message, status) ;
100  }
101
102  bool CLegacyContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
103  {
104    int count;
105    char * addr;
106    map<int,CServerBuffer*>::iterator it;
107    int rank=status.MPI_SOURCE ;
108
109    it=buffers.find(rank);
110    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
111    {
112      MPI_Aint recvBuff[4] ;
113      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
114      remoteHashId_ = recvBuff[0] ;
115      StdSize buffSize = recvBuff[1];
116      vector<MPI_Aint> winAdress(2) ;
117      winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ;
118      mapBufferSize_.insert(std::make_pair(rank, buffSize));
119
120      // create windows dynamically for one-sided
121      if (!isAttachedModeEnabled())
122      { 
123        CTimer::get("create Windows").resume() ;
124        MPI_Comm interComm ;
125        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ;
126        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
127        CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
128        MPI_Comm_free(&interComm) ;
129        windows_[rank].resize(2) ;
130        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
131        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
132        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
133        CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
134        CTimer::get("create Windows").suspend() ;
135        MPI_Barrier(winComm_[rank]) ;
136      }
137      else
138      {
139        winComm_[rank] = MPI_COMM_NULL ;
140        windows_[rank].resize(2) ;
141        windows_[rank][0] = MPI_WIN_NULL ;
142        windows_[rank][1] = MPI_WIN_NULL ;
143      }   
144
145      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first;
146      lastTimeLine[rank]=0 ;
147      itLastTimeLine=lastTimeLine.begin() ;
148
149      return true;
150    }
151    else
152    {
153        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
154        pendingProbe[rank].push_back(mypair) ;
155        return false;
156    }
157  }
158
159  void CLegacyContextServer::checkPendingProbe(void)
160  {
161   
162    list<int> recvProbe ;
163    list<int>::iterator itRecv ;
164    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
165
166    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
167    {
168      int rank=itProbe->first ;
169      if (pendingRequest.count(rank)==0)
170      {
171        MPI_Message& message = itProbe->second.front().first ;
172        MPI_Status& status = itProbe->second.front().second ;
173        int count ;
174        MPI_Get_count(&status,MPI_CHAR,&count);
175        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
176        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
177          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
178        {
179          char * addr;
180          addr=(char*)it->second->getBuffer(count);
181          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
182          bufferRequest[rank]=addr;
183          recvProbe.push_back(rank) ;
184          itProbe->second.pop_front() ;
185        }
186      }
187    }
188
189    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
190  }
191
192
193  void CLegacyContextServer::checkPendingRequest(void)
194  {
195    map<int,MPI_Request>::iterator it;
196    list<int> recvRequest;
197    list<int>::iterator itRecv;
198    int rank;
199    int flag;
200    int count;
201    MPI_Status status;
202   
203    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
204    else CTimer::get("receiving requests").suspend();
205
206    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
207    {
208      rank=it->first;
209      traceOff();
210      MPI_Test(& it->second, &flag, &status);
211      traceOn();
212      if (flag==true)
213      {
214        buffers[rank]->updateCurrentWindows() ;
215        recvRequest.push_back(rank);
216        MPI_Get_count(&status,MPI_CHAR,&count);
217        processRequest(rank,bufferRequest[rank],count);
218      }
219    }
220
221    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
222    {
223      pendingRequest.erase(*itRecv);
224      bufferRequest.erase(*itRecv);
225    }
226  }
227
228  void CLegacyContextServer::getBufferFromClient(size_t timeLine)
229  {
230    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ;
231    if (!isAttachedModeEnabled()) // one sided desactivated in attached mode
232    { 
233      int rank ;
234      char *buffer ;
235      size_t count ; 
236
237      if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
238      for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
239      {
240        rank=itLastTimeLine->first ;
241        if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
242        {
243          if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
244          if (count >= 0) ++itLastTimeLine ;
245          break ;
246        }
247      }
248    }
249    CTimer::get("CLegacyContextServer::getBufferFromClient").suspend() ;
250  }
251         
252       
253  void CLegacyContextServer::processRequest(int rank, char* buff,int count)
254  {
255
256    CBufferIn buffer(buff,count);
257    char* startBuffer,endBuffer;
258    int size, offset;
259    size_t timeLine=0;
260    map<size_t,CEventServer*>::iterator it;
261
262   
263    CTimer::get("Process request").resume();
264    while(count>0)
265    {
266      char* startBuffer=(char*)buffer.ptr();
267      CBufferIn newBuffer(startBuffer,buffer.remain());
268      newBuffer>>size>>timeLine;
269
270      if (timeLine==timelineEventNotifyChangeBufferSize)
271      {
272        buffers[rank]->notifyBufferResizing() ;
273        buffers[rank]->updateCurrentWindows() ;
274        buffers[rank]->popBuffer(count) ;
275        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
276                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
277      } 
278      else if (timeLine==timelineEventChangeBufferSize)
279      {
280        size_t newSize ;
281        vector<MPI_Aint> winAdress(2) ;
282        newBuffer>>newSize>>winAdress[0]>>winAdress[1] ;
283        buffers[rank]->freeBuffer(count) ;
284        delete buffers[rank] ;
285        buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, newSize) ;
286        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
287                 <<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ;
288      }
289      else
290      {
291        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
292        it=events.find(timeLine);
293       
294        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
295        it->second->push(rank,buffers[rank],startBuffer,size);
296        if (timeLine>0) lastTimeLine[rank]=timeLine ;
297      }
298      buffer.advance(size);
299      count=buffer.remain();
300    }
301   
302    CTimer::get("Process request").suspend();
303  }
304
305  void CLegacyContextServer::processEvents(bool enableEventsProcessing)
306  {
307    map<size_t,CEventServer*>::iterator it;
308    CEventServer* event;
309   
310//    if (context->isProcessingEvent()) return ;
311    if (isProcessingEvent_) return ;
312    if (isAttachedModeEnabled())
313      if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ;
314
315    it=events.find(currentTimeLine);
316    if (it!=events.end())
317    {
318      event=it->second;
319
320      if (event->isFull())
321      {
322        if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side
323        {
324          eventScheduler_->registerEvent(currentTimeLine,hashId);
325          scheduled=true;
326        }
327        else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) )
328        {
329          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
330
331          if (!eventScheduled_) 
332          {
333            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
334            eventScheduled_=true ;
335            return ;
336          }
337          else 
338          {
339            MPI_Status status ;
340            int flag ;
341            MPI_Test(&processEventRequest_, &flag, &status) ;
342            if (!flag) return ;
343            eventScheduled_=false ;
344          }
345
346          if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ;
347          //MPI_Barrier(intraComm) ;
348         // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes
349         // The best way to properly solve this problem will be to use the event scheduler also in attached mode
350         // for now just set up a MPI barrier
351//ym to be check later
352//         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ;
353
354//         context->setProcessingEvent() ;
355         isProcessingEvent_=true ;
356         CTimer::get("Process events").resume();
357         info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
358         dispatchEvent(*event);
359         CTimer::get("Process events").suspend();
360         isProcessingEvent_=false ;
361//         context->unsetProcessingEvent() ;
362         pendingEvent=false;
363         delete event;
364         events.erase(it);
365         currentTimeLine++;
366         scheduled = false;
367         if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ;
368        }
369      }
370      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
371    }
372    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
373  }
374
375  CLegacyContextServer::~CLegacyContextServer()
376  {
377    map<int,CServerBuffer*>::iterator it;
378    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
379    buffers.clear() ;
380  }
381
382  void CLegacyContextServer::releaseBuffers()
383  {
384    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
385    //buffers.clear() ;
386    freeWindows() ;
387  }
388
389  void CLegacyContextServer::freeWindows()
390  {
391    //if (!isAttachedModeEnabled())
392    //{
393    //  for(auto& it : winComm_)
394    //  {
395    //    int rank = it.first ;
396    //    MPI_Win_free(&windows_[rank][0]);
397    //    MPI_Win_free(&windows_[rank][1]);
398    //    MPI_Comm_free(&winComm_[rank]) ;
399    //  }
400    //}
401  }
402
403  void CLegacyContextServer::notifyClientsFinalize(void)
404  {
405    for(auto it=buffers.begin();it!=buffers.end();++it)
406    {
407      it->second->notifyClientFinalize() ;
408    }
409  }
410
411  void CLegacyContextServer::dispatchEvent(CEventServer& event)
412  {
413    string contextName;
414    string buff;
415    int MsgSize;
416    int rank;
417    list<CEventServer::SSubEvent>::iterator it;
418    StdString ctxId = context->getId();
419    CContext::setCurrent(ctxId);
420    StdSize totalBuf = 0;
421
422    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
423    {
424      finished=true;
425      info(20)<<" CLegacyContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
426      notifyClientsFinalize() ;
427      CTimer::get("receiving requests").suspend();
428      context->finalize();
429     
430      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
431                           iteMap = mapBufferSize_.end(), itMap;
432      for (itMap = itbMap; itMap != iteMap; ++itMap)
433      {
434        rank = itMap->first;
435        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
436            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
437        totalBuf += itMap->second;
438      }
439      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
440    }
441    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
442    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
443    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
444    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
445    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
446    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
447    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
448    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
449    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
450    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
451    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
452    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
453    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
454    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
455    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
456    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
457    else
458    {
459      ERROR("void CLegacyContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
460    }
461  }
462
463  bool CLegacyContextServer::isCollectiveEvent(CEventServer& event)
464  {
465    if (event.type>1000) return false ;
466    else return true ;
467  }
468}
Note: See TracBrowser for help on using the repository browser.