source: XIOS3/trunk/src/transport/legacy_context_server.cpp @ 2595

Last change on this file since 2595 was 2595, checked in by jderouillat, 7 months ago

Check that all events are managed before the end of the simulation

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 16.5 KB
Line 
1#include "legacy_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CLegacyContextServer::CLegacyContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : CContextServer(parent, intraComm_, interComm_),
35      isProcessingEvent_(false)
36  {
37 
38    xios::MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39    CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ;
40
41    currentTimeLine=1;
42    scheduled=false;
43    finished=false;
44
45    xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
46    CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ;
47    xios::MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
48    CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ;
49 
50    itLastTimeLine=lastTimeLine.begin() ;
51
52    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
53  }
54 
55  void CLegacyContextServer::setPendingEvent(void)
56  {
57    pendingEvent=true;
58  }
59
60  bool CLegacyContextServer::hasPendingEvent(void)
61  {
62    return (pendingRequest.size()!=0);
63  }
64
65  bool CLegacyContextServer::hasFinished(void)
66  {
67    return finished;
68  }
69
70  bool CLegacyContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
71  {
72    CTimer::get("listen request").resume();
73    listen();
74    CTimer::get("listen request").suspend();
75    CTimer::get("check pending request").resume();
76    checkPendingRequest();
77    checkPendingProbe() ;
78    CTimer::get("check pending request").suspend();
79    CTimer::get("check event process").resume();
80    processEvents(enableEventsProcessing);
81    CTimer::get("check event process").suspend();
82    return finished;
83  }
84
85 void CLegacyContextServer::listen(void)
86  {
87    int rank;
88    int flag;
89    int count;
90    char * addr;
91    MPI_Status status;
92    MPI_Message message ;
93    map<int,CServerBuffer*>::iterator it;
94    bool okLoop;
95
96    traceOff();
97    MPI_Improbe(MPI_ANY_SOURCE, 20, interCommMerged_,&flag,&message, &status);
98    traceOn();
99    if (flag==true) listenPendingRequest(message, status) ;
100  }
101
102  bool CLegacyContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
103  {
104    int count;
105    char * addr;
106    map<int,CServerBuffer*>::iterator it;
107    int rank=status.MPI_SOURCE ;
108
109    it=buffers.find(rank);
110    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
111    {
112      MPI_Aint recvBuff[4] ;
113      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
114      remoteHashId_ = recvBuff[0] ;
115      StdSize buffSize = recvBuff[1];
116      vector<MPI_Aint> winBufferAddress(2) ;
117      winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ;
118      mapBufferSize_.insert(std::make_pair(rank, buffSize));
119
120      // create windows dynamically for one-sided
121      int dummy ;
122      MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ;
123      CTimer::get("create Windows").resume() ;
124      MPI_Comm interComm ;
125      int tag = 0 ;
126      xios::MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ;
127      xios::MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
128      xios::MPI_Comm_free(&interComm) ;
129      windows_[rank].resize(2) ;
130      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
131      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
132      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
133      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
134      windows_[rank][0] = new CWindowDynamic() ;
135      windows_[rank][1] = new CWindowDynamic() ;
136      windows_[rank][0] -> create(winComm_[rank]) ;
137      windows_[rank][1] -> create(winComm_[rank]) ;
138      windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ;
139      windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ;
140      CTimer::get("create Windows").suspend() ;
141      CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
142      MPI_Barrier(winComm_[rank]) ;
143
144      it=(buffers.insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, 0, buffSize)))).first;
145      lastTimeLine[rank]=0 ;
146      itLastTimeLine=lastTimeLine.begin() ;
147
148      return true;
149    }
150    else
151    {
152        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
153        pendingProbe[rank].push_back(mypair) ;
154        return false;
155    }
156  }
157
158  void CLegacyContextServer::checkPendingProbe(void)
159  {
160   
161    list<int> recvProbe ;
162    list<int>::iterator itRecv ;
163    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
164
165    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
166    {
167      int rank=itProbe->first ;
168      if (pendingRequest.count(rank)==0)
169      {
170        MPI_Message& message = itProbe->second.front().first ;
171        MPI_Status& status = itProbe->second.front().second ;
172        int count ;
173        MPI_Get_count(&status,MPI_CHAR,&count);
174        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
175        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
176          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
177        {
178          char * addr;
179          addr=(char*)it->second->getBuffer(count);
180          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
181          bufferRequest[rank]=addr;
182          recvProbe.push_back(rank) ;
183          itProbe->second.pop_front() ;
184        }
185      }
186    }
187
188    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
189  }
190
191
192  void CLegacyContextServer::checkPendingRequest(void)
193  {
194    map<int,MPI_Request>::iterator it;
195    list<int> recvRequest;
196    list<int>::iterator itRecv;
197    int rank;
198    int flag;
199    int count;
200    MPI_Status status;
201   
202    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
203    else CTimer::get("receiving requests").suspend();
204
205    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
206    {
207      rank=it->first;
208      traceOff();
209      MPI_Test(& it->second, &flag, &status);
210      traceOn();
211      if (flag==true)
212      {
213        buffers[rank]->updateCurrentWindows() ;
214        recvRequest.push_back(rank);
215        MPI_Get_count(&status,MPI_CHAR,&count);
216        processRequest(rank,bufferRequest[rank],count);
217      }
218    }
219
220    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
221    {
222      pendingRequest.erase(*itRecv);
223      bufferRequest.erase(*itRecv);
224    }
225  }
226
227  void CLegacyContextServer::getBufferFromClient(size_t timeLine)
228  {
229    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ;
230
231    int rank ;
232    char *buffer ;
233    size_t count ; 
234
235    if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
236    for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
237    {
238      rank=itLastTimeLine->first ;
239      if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
240      {
241        if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
242        if (count >= 0) ++itLastTimeLine ;
243        break ;
244      }
245    }
246    CTimer::get("CLegacyContextServer::getBufferFromClient").suspend() ;
247  }
248         
249       
250  void CLegacyContextServer::processRequest(int rank, char* buff,int count)
251  {
252
253    CBufferIn buffer(buff,count);
254    char* startBuffer,endBuffer;
255    int size, offset;
256    size_t timeLine=0;
257    map<size_t,CEventServer*>::iterator it;
258
259   
260    CTimer::get("Process request").resume();
261    while(count>0)
262    {
263      char* startBuffer=(char*)buffer.ptr();
264      CBufferIn newBuffer(startBuffer,buffer.remain());
265      newBuffer>>size>>timeLine;
266
267      if (timeLine==timelineEventNotifyChangeBufferSize)
268      {
269        buffers[rank]->notifyBufferResizing() ;
270        buffers[rank]->updateCurrentWindows() ;
271        buffers[rank]->popBuffer(count) ;
272        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
273                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
274      } 
275      else if (timeLine==timelineEventChangeBufferSize)
276      {
277        size_t newSize ;
278        vector<MPI_Aint> winBufferAdress(2) ;
279        newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ;
280        buffers[rank]->freeBuffer(count) ;
281        delete buffers[rank] ;
282        windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ;
283        windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ;
284        buffers[rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, 0, newSize) ;
285        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
286                 <<"  newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ;
287      }
288      else
289      {
290        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
291        it=events.find(timeLine);
292       
293        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
294        it->second->push(rank,buffers[rank],startBuffer,size);
295        if (timeLine>0) lastTimeLine[rank]=timeLine ;
296      }
297      buffer.advance(size);
298      count=buffer.remain();
299    }
300   
301    CTimer::get("Process request").suspend();
302  }
303
304  void CLegacyContextServer::processEvents(bool enableEventsProcessing)
305  {
306    map<size_t,CEventServer*>::iterator it;
307    CEventServer* event;
308   
309    if (isProcessingEvent_) return ;
310
311    it=events.find(currentTimeLine);
312    if (it!=events.end())
313    {
314      event=it->second;
315
316      if (event->isFull())
317      {
318        if (!scheduled)
319        {
320          eventScheduler_->registerEvent(currentTimeLine,hashId);
321          info(100)<<"Context id "<<context->getId()<<"Schedule event : "<< currentTimeLine <<"  "<<hashId<<endl ;
322          scheduled=true;
323        }
324        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) )
325        {
326          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
327
328          if (!eventScheduled_) 
329          {
330            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
331            eventScheduled_=true ;
332            return ;
333          }
334          else 
335          {
336            MPI_Status status ;
337            int flag ;
338            MPI_Test(&processEventRequest_, &flag, &status) ;
339            if (!flag) return ;
340            eventScheduled_=false ;
341          }
342         
343          if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT)
344          {
345            int typeId, classId, typeId_in, classId_in;
346            long long timeLine_out;
347            long long timeLine_in( currentTimeLine );
348            typeId_in=event->type ;
349            classId_in=event->classId ;
350   //        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
351            MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
352            MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
353            MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
354            if (typeId/intraCommSize!=event->type || classId/intraCommSize!=event->classId || timeLine_out/intraCommSize!=currentTimeLine)
355            {
356               ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
357                  << "Event are not coherent between client for timeline = "<<currentTimeLine);
358            }
359          }
360
361          isProcessingEvent_=true ;
362          CTimer::get("Process events").resume();
363          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
364          eventScheduler_->popEvent() ;
365          dispatchEvent(*event);
366          CTimer::get("Process events").suspend();
367          isProcessingEvent_=false ;
368          pendingEvent=false;
369          delete event;
370          events.erase(it);
371          currentTimeLine++;
372          scheduled = false;
373        }
374      }
375      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
376    }
377    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
378  }
379
380  CLegacyContextServer::~CLegacyContextServer()
381  {
382    map<int,CServerBuffer*>::iterator it;
383    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
384    buffers.clear() ;
385  }
386
387  void CLegacyContextServer::releaseBuffers()
388  {
389    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
390    //buffers.clear() ;
391    freeWindows() ;
392  }
393
394  void CLegacyContextServer::freeWindows()
395  {
396    for(auto& it : winComm_)
397    {
398      int rank = it.first ;
399      delete windows_[rank][0];
400      delete windows_[rank][1];
401    }
402  }
403
404  void CLegacyContextServer::notifyClientsFinalize(void)
405  {
406    for(auto it=buffers.begin();it!=buffers.end();++it)
407    {
408      it->second->notifyClientFinalize() ;
409    }
410  }
411
412  void CLegacyContextServer::dispatchEvent(CEventServer& event)
413  {
414    string contextName;
415    string buff;
416    int MsgSize;
417    int rank;
418    list<CEventServer::SSubEvent>::iterator it;
419    StdString ctxId = context->getId();
420    CContext::setCurrent(ctxId);
421    StdSize totalBuf = 0;
422
423    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
424    {
425      finished=true;
426      info(20)<<" CLegacyContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
427      notifyClientsFinalize() ;
428      CTimer::get("receiving requests").suspend();
429      context->finalize();
430     
431      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
432                           iteMap = mapBufferSize_.end(), itMap;
433      for (itMap = itbMap; itMap != iteMap; ++itMap)
434      {
435        rank = itMap->first;
436        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
437            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
438        totalBuf += itMap->second;
439      }
440      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
441    }
442    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
443    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
444    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
445    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
446    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
447    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
448    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
449    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
450    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
451    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
452    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
453    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
454    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
455    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
456    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
457    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
458    else
459    {
460      ERROR("void CLegacyContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
461    }
462  }
463
464  bool CLegacyContextServer::isCollectiveEvent(CEventServer& event)
465  {
466    if (event.type>1000) return false ;
467    else return true ;
468  }
469}
Note: See TracBrowser for help on using the repository browser.