source: XIOS3/trunk/src/transport/p2p_context_server.cpp

Last change on this file was 2629, checked in by jderouillat, 6 weeks ago

Delete boost dependencies, the few features used are replaced by functions stored in extern/boost_extraction

  • Property svn:executable set to *
File size: 10.8 KB
Line 
1#include "p2p_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <random>
25#include <chrono>
26
27
28namespace xios
29{
30  using namespace std ;
31  extern CLogType logTimers ;
32  extern CLogType logProfile ;
33
34  CP2pContextServer::CP2pContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_)
35                         : CContextServer(parent, intraComm_, interComm_), 
36                           isProcessingEvent_(false)
37  {
38   
39    xios::MPI_Comm_dup(intraComm, &processEventBarrier_) ;
40    CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ;
41   
42    currentTimeLine=1;
43    scheduled=false;
44    finished=false;
45
46    xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
47    CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ;
48    xios::MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
49    CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ;
50   
51    itLastTimeLine=lastTimeLine.begin() ;
52
53    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
54     
55  }
56
57  void CP2pContextServer::setPendingEvent(void)
58  {
59    pendingEvent=true;
60  }
61
62  bool CP2pContextServer::hasPendingEvent(void)
63  {
64    return ((pendingEvents_.size()!=0)||(completedEvents_.size()!=0));
65  }
66
67  bool CP2pContextServer::hasFinished(void)
68  {
69    return finished;
70  }
71
72  bool CP2pContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
73  {
74    if (info.isActive(logProfile)) CTimer::get("Recv event loop (p2p)").resume();
75    if (info.isActive(logTimers)) CTimer::get("listen request").resume();
76    listen();
77    if (info.isActive(logTimers)) CTimer::get("listen request").suspend();
78
79    if (info.isActive(logTimers)) CTimer::get("listen pending request").resume();
80    listenPendingRequest() ;
81    if (info.isActive(logTimers)) CTimer::get("listen pending request").suspend();
82
83    if (info.isActive(logTimers)) CTimer::get("check server Buffers").resume();
84    checkBuffers() ;
85    if (info.isActive(logTimers)) CTimer::get("check server Buffers").suspend();
86
87    if (info.isActive(logTimers)) CTimer::get("check event process").resume();
88    processEvents(enableEventsProcessing);
89    if (info.isActive(logTimers)) CTimer::get("check event process").suspend();
90    if (info.isActive(logProfile)) CTimer::get("Recv event loop (p2p)").suspend();
91    return finished;
92
93  }
94
95 void CP2pContextServer::listen(void)
96  {
97    int rank;
98    int flag;
99    MPI_Status status;
100    flag=true ;
101
102    while(flag)
103    {
104      traceOff();
105      MPI_Iprobe(MPI_ANY_SOURCE, 20,interCommMerged_, &flag, &status);
106
107      traceOn();
108      if (flag==true)
109      {
110        int rank=status.MPI_SOURCE ;
111        auto& rankRequests = requests_[rank];
112        rankRequests.push_back(new CRequest(interCommMerged_, status)) ;
113        // Test 1st request of the list, request treatment must be ordered
114        if (rankRequests.front()->test())
115        {
116          processRequest( *(rankRequests.front()) );
117          delete rankRequests.front();
118          rankRequests.pop_front() ;
119        }
120      }
121    }
122  }
123
124  void CP2pContextServer::listenPendingRequest(void)
125  {
126    for(auto it_rank=requests_.begin() ; it_rank!=requests_.end() ; ++it_rank)
127    {
128      int rank = it_rank->first;
129      auto& rankRequests = it_rank->second;
130      while ( (!rankRequests.empty()) && (rankRequests.front()->test()) )
131      {
132        processRequest( *(rankRequests.front()) );
133        delete rankRequests.front();
134        rankRequests.pop_front() ;
135      }
136    }
137  }
138
139  void CP2pContextServer::processRequest(CRequest& request)
140  {
141    int rank = request.getRank() ;
142    auto it=buffers_.find(rank);
143    if (it==buffers_.end())
144    {
145      buffers_[rank] = new CP2pServerBuffer(rank, commSelf_, interCommMerged_, pendingEvents_, completedEvents_, request.getBuffer()) ;
146    }
147    else
148    {
149      it->second->receivedRequest(request.getBuffer()) ;
150    }
151  }
152
153  void CP2pContextServer::checkBuffers(void)
154  {
155    if (!pendingEvents_.empty())
156    {
157/*
158      SPendingEvent& nextEvent = pendingEvents_.begin()->second ;
159      for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ;
160      if (nextEvent.nbSenders==0) pendingEvents_.erase(pendingEvents_.begin()) ;
161*/
162      for(auto it=pendingEvents_.begin() ;  it!=pendingEvents_.end() ;)
163      {
164        SPendingEvent& nextEvent = it->second ;
165        for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ;
166        if (nextEvent.nbSenders==0) it=pendingEvents_.erase(it) ;
167        else ++it ;
168      }
169    }
170  }
171
172
173  void CP2pContextServer::processEvents(bool enableEventsProcessing)
174  {
175 
176    if (isProcessingEvent_) return ;
177
178    auto it=completedEvents_.find(currentTimeLine);
179
180    if (it!=completedEvents_.end())
181    {
182      if (it->second.nbSenders == it->second.currentNbSenders)
183      {
184        if (!scheduled) 
185        {
186          eventScheduler_->registerEvent(currentTimeLine,hashId);
187          scheduled=true;
188        }
189        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) )
190        {
191          //if (!enableEventsProcessing && isCollectiveEvent(event)) return ;
192
193          if (!eventScheduled_) 
194          {
195            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
196            eventScheduled_=true ;
197            return ;
198          }
199          else 
200          {
201            MPI_Status status ;
202            int flag ;
203            MPI_Test(&processEventRequest_, &flag, &status) ;
204            if (!flag) return ;
205            eventScheduled_=false ;
206          }
207
208          eventScheduler_->popEvent() ;
209
210          isProcessingEvent_=true ;
211          CEventServer event(this) ;
212          for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ;
213//          MPI_Barrier(intraComm) ;
214          CTimer::get("Process events").resume();
215          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ;
216          dispatchEvent(event);
217          CTimer::get("Process events").suspend();
218          isProcessingEvent_=false ;
219//         context->unsetProcessingEvent() ;
220          pendingEvent=false;
221          completedEvents_.erase(it);
222          currentTimeLine++;
223          scheduled = false;
224        }
225      }
226    }
227  }
228
229  CP2pContextServer::~CP2pContextServer()
230  {
231    for(auto& buffer : buffers_) delete buffer.second;
232    buffers_.clear() ;
233  }
234
235  void CP2pContextServer::releaseBuffers()
236  {
237    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
238    //buffers.clear() ;
239    freeWindows() ;
240  }
241
242  void CP2pContextServer::freeWindows()
243  {
244    //  for(auto& it : winComm_)
245    //  {
246    //    int rank = it.first ;
247    //    MPI_Win_free(&windows_[rank][0]);
248    //    MPI_Win_free(&windows_[rank][1]);
249    //    xios::MPI_Comm_free(&winComm_[rank]) ;
250    //  }
251  }
252
253  void CP2pContextServer::notifyClientsFinalize(void)
254  {
255    for(auto it=buffers_.begin();it!=buffers_.end();++it)
256    {
257      it->second->notifyClientFinalize() ;
258    }
259  }
260
261  void CP2pContextServer::dispatchEvent(CEventServer& event)
262  {
263    string contextName;
264    string buff;
265    int MsgSize;
266    int rank;
267    list<CEventServer::SSubEvent>::iterator it;
268    StdString ctxId = context->getId();
269    CContext::setCurrent(ctxId);
270    StdSize totalBuf = 0;
271
272    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
273    {
274      CTimer::get("Context finalize").resume();
275      finished=true;
276      info(20)<<" CP2pContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
277      notifyClientsFinalize() ;
278      if (info.isActive(logTimers)) CTimer::get("receiving requests").suspend();
279      context->finalize();
280     
281      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
282                           iteMap = mapBufferSize_.end(), itMap;
283      for (itMap = itbMap; itMap != iteMap; ++itMap)
284      {
285        rank = itMap->first;
286        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
287            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
288        totalBuf += itMap->second;
289      }
290      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
291      CTimer::get("Context finalize").suspend();
292    }
293    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
294    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
295    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
296    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
297    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
298    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
299    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
300    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
301    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
302    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
303    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
304    else if (event.classId==CField::GetType()) 
305    {
306      if (event.type==CField::EVENT_ID_UPDATE_DATA) CField::dispatchEvent(event);
307      else CField::dispatchEvent(event);
308    }
309    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
310    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
311    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
312    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
313    else
314    {
315      ERROR("void CP2pContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
316    }
317  }
318
319  bool CP2pContextServer::isCollectiveEvent(CEventServer& event)
320  {
321    if (event.type>1000) return false ;
322    else return true ;
323  }
324}
Note: See TracBrowser for help on using the repository browser.