source: XIOS3/trunk/src/transport/legacy_context_server.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 7 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 17.3 KB
Line 
1#include "legacy_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32  extern CLogType logTimers ;
33  extern CLogType logProfile ;
34
35  CLegacyContextServer::CLegacyContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
36    : CContextServer(parent, intraComm_, interComm_),
37      isProcessingEvent_(false)
38  {
39 
40    xios::MPI_Comm_dup(intraComm, &processEventBarrier_) ;
41    CXios::getMpiGarbageCollector().registerCommunicator(processEventBarrier_) ;
42
43    currentTimeLine=1;
44    scheduled=false;
45    finished=false;
46
47    xios::MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
48    CXios::getMpiGarbageCollector().registerCommunicator(interCommMerged_) ;
49    xios::MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
50    CXios::getMpiGarbageCollector().registerCommunicator(commSelf_) ;
51 
52    itLastTimeLine=lastTimeLine.begin() ;
53
54    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
55  }
56 
57  void CLegacyContextServer::setPendingEvent(void)
58  {
59    pendingEvent=true;
60  }
61
62  bool CLegacyContextServer::hasPendingEvent(void)
63  {
64    return (pendingRequest.size()!=0);
65  }
66
67  bool CLegacyContextServer::hasFinished(void)
68  {
69    return finished;
70  }
71
72  bool CLegacyContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
73  {
74    if (info.isActive(logProfile)) CTimer::get("Recv event loop (legacy)").resume();
75    if (info.isActive(logTimers)) CTimer::get("listen request").resume();
76    listen();
77    if (info.isActive(logTimers)) CTimer::get("listen request").suspend();
78    if (info.isActive(logTimers)) CTimer::get("check pending request").resume();
79    checkPendingRequest();
80    checkPendingProbe() ;
81    if (info.isActive(logTimers)) CTimer::get("check pending request").suspend();
82    if (info.isActive(logTimers)) CTimer::get("check event process").resume();
83    processEvents(enableEventsProcessing);
84    if (info.isActive(logTimers)) CTimer::get("check event process").suspend();
85    if (info.isActive(logProfile)) CTimer::get("Recv event loop (legacy)").suspend();
86    return finished;
87  }
88
89 void CLegacyContextServer::listen(void)
90  {
91    int rank;
92    int flag;
93    int count;
94    char * addr;
95    MPI_Status status;
96    MPI_Message message ;
97    map<int,CServerBuffer*>::iterator it;
98    bool okLoop;
99
100    traceOff();
101    MPI_Improbe(MPI_ANY_SOURCE, 20, interCommMerged_,&flag,&message, &status);
102    traceOn();
103    if (flag==true) listenPendingRequest(message, status) ;
104  }
105
106  bool CLegacyContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
107  {
108    int count;
109    char * addr;
110    map<int,CServerBuffer*>::iterator it;
111    int rank=status.MPI_SOURCE ;
112
113    it=buffers.find(rank);
114    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
115    {
116      MPI_Aint recvBuff[4] ;
117      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
118      remoteHashId_ = recvBuff[0] ;
119      StdSize buffSize = recvBuff[1];
120      vector<MPI_Aint> winBufferAddress(2) ;
121      winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ;
122      mapBufferSize_.insert(std::make_pair(rank, buffSize));
123
124      // create windows dynamically for one-sided
125      int dummy ;
126      MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ;
127      if (info.isActive(logTimers)) CTimer::get("create Windows").resume() ;
128      MPI_Comm interComm ;
129      int tag = 0 ;
130      xios::MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ;
131      xios::MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
132      xios::MPI_Comm_free(&interComm) ;
133      windows_[rank].resize(2) ;
134      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
135      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
136      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
137      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
138      windows_[rank][0] = new CWindowDynamic() ;
139      windows_[rank][1] = new CWindowDynamic() ;
140      windows_[rank][0] -> create(winComm_[rank]) ;
141      windows_[rank][1] -> create(winComm_[rank]) ;
142      windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ;
143      windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ;
144      if (info.isActive(logTimers)) CTimer::get("create Windows").suspend() ;
145      CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
146      MPI_Barrier(winComm_[rank]) ;
147
148      it=(buffers.insert(pair<int,CServerBuffer*>(rank, new CServerBuffer(rank, windows_[rank], winBufferAddress, 0, buffSize)))).first;
149      lastTimeLine[rank]=0 ;
150      itLastTimeLine=lastTimeLine.begin() ;
151
152      return true;
153    }
154    else
155    {
156        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
157        pendingProbe[rank].push_back(mypair) ;
158        return false;
159    }
160  }
161
162  void CLegacyContextServer::checkPendingProbe(void)
163  {
164   
165    list<int> recvProbe ;
166    list<int>::iterator itRecv ;
167    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
168
169    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
170    {
171      int rank=itProbe->first ;
172      if (pendingRequest.count(rank)==0)
173      {
174        MPI_Message& message = itProbe->second.front().first ;
175        MPI_Status& status = itProbe->second.front().second ;
176        int count ;
177        MPI_Get_count(&status,MPI_CHAR,&count);
178        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
179        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
180          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
181        {
182          char * addr;
183          addr=(char*)it->second->getBuffer(count);
184          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
185          bufferRequest[rank]=addr;
186          recvProbe.push_back(rank) ;
187          itProbe->second.pop_front() ;
188        }
189      }
190    }
191
192    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
193  }
194
195
196  void CLegacyContextServer::checkPendingRequest(void)
197  {
198    map<int,MPI_Request>::iterator it;
199    list<int> recvRequest;
200    list<int>::iterator itRecv;
201    int rank;
202    int flag;
203    int count;
204    MPI_Status status;
205   
206    if (!pendingRequest.empty()) if (info.isActive(logTimers)) CTimer::get("receiving requests").resume();
207    else if (info.isActive(logTimers)) CTimer::get("receiving requests").suspend();
208
209    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
210    {
211      rank=it->first;
212      traceOff();
213      MPI_Test(& it->second, &flag, &status);
214      traceOn();
215      if (flag==true)
216      {
217        buffers[rank]->updateCurrentWindows() ;
218        recvRequest.push_back(rank);
219        MPI_Get_count(&status,MPI_CHAR,&count);
220        processRequest(rank,bufferRequest[rank],count);
221      }
222    }
223
224    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
225    {
226      pendingRequest.erase(*itRecv);
227      bufferRequest.erase(*itRecv);
228    }
229  }
230
231  void CLegacyContextServer::getBufferFromClient(size_t timeLine)
232  {
233    if (info.isActive(logTimers)) CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ;
234
235    int rank ;
236    char *buffer ;
237    size_t count ; 
238
239    if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
240    for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
241    {
242      rank=itLastTimeLine->first ;
243      if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
244      {
245        if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
246        if (count >= 0) ++itLastTimeLine ;
247        break ;
248      }
249    }
250    if (info.isActive(logTimers)) CTimer::get("CLegacyContextServer::getBufferFromClient").suspend() ;
251  }
252         
253       
254  void CLegacyContextServer::processRequest(int rank, char* buff,int count)
255  {
256
257    CBufferIn buffer(buff,count);
258    char* startBuffer,endBuffer;
259    int size, offset;
260    size_t timeLine=0;
261    map<size_t,CEventServer*>::iterator it;
262
263   
264    CTimer::get("Process request").resume();
265    while(count>0)
266    {
267      char* startBuffer=(char*)buffer.ptr();
268      CBufferIn newBuffer(startBuffer,buffer.remain());
269      newBuffer>>size>>timeLine;
270
271      if (timeLine==timelineEventNotifyChangeBufferSize)
272      {
273        buffers[rank]->notifyBufferResizing() ;
274        buffers[rank]->updateCurrentWindows() ;
275        buffers[rank]->popBuffer(count) ;
276        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
277                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
278      } 
279      else if (timeLine==timelineEventChangeBufferSize)
280      {
281        size_t newSize ;
282        vector<MPI_Aint> winBufferAdress(2) ;
283        newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ;
284        buffers[rank]->freeBuffer(count) ;
285        delete buffers[rank] ;
286        windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ;
287        windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ;
288        buffers[rank] = new CServerBuffer(rank, windows_[rank], winBufferAdress, 0, newSize) ;
289        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
290                 <<"  newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ;
291      }
292      else
293      {
294        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
295        it=events.find(timeLine);
296       
297        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
298        it->second->push(rank,buffers[rank],startBuffer,size);
299        if (timeLine>0) lastTimeLine[rank]=timeLine ;
300      }
301      buffer.advance(size);
302      count=buffer.remain();
303    }
304   
305    CTimer::get("Process request").suspend();
306  }
307
308  void CLegacyContextServer::processEvents(bool enableEventsProcessing)
309  {
310    map<size_t,CEventServer*>::iterator it;
311    CEventServer* event;
312   
313    if (isProcessingEvent_) return ;
314
315    it=events.find(currentTimeLine);
316    if (it!=events.end())
317    {
318      event=it->second;
319
320      if (event->isFull())
321      {
322        if (!scheduled)
323        {
324          eventScheduler_->registerEvent(currentTimeLine,hashId);
325          info(100)<<"Context id "<<context->getId()<<"Schedule event : "<< currentTimeLine <<"  "<<hashId<<endl ;
326          scheduled=true;
327        }
328        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) )
329        {
330          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
331
332          if (!eventScheduled_) 
333          {
334            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
335            eventScheduled_=true ;
336            return ;
337          }
338          else 
339          {
340            MPI_Status status ;
341            int flag ;
342            MPI_Test(&processEventRequest_, &flag, &status) ;
343            if (!flag) return ;
344            eventScheduled_=false ;
345          }
346         
347          if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT)
348          {
349            int typeId, classId, typeId_in, classId_in;
350            long long timeLine_out;
351            long long timeLine_in( currentTimeLine );
352            typeId_in=event->type ;
353            classId_in=event->classId ;
354   //        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
355            MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
356            MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
357            MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
358            if (typeId/intraCommSize!=event->type || classId/intraCommSize!=event->classId || timeLine_out/intraCommSize!=currentTimeLine)
359            {
360               ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
361                  << "Event are not coherent between client for timeline = "<<currentTimeLine);
362            }
363          }
364
365          isProcessingEvent_=true ;
366          CTimer::get("Process events").resume();
367          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
368          eventScheduler_->popEvent() ;
369          dispatchEvent(*event);
370          CTimer::get("Process events").suspend();
371          isProcessingEvent_=false ;
372          pendingEvent=false;
373          delete event;
374          events.erase(it);
375          currentTimeLine++;
376          scheduled = false;
377        }
378      }
379      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
380    }
381    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
382  }
383
384  CLegacyContextServer::~CLegacyContextServer()
385  {
386    map<int,CServerBuffer*>::iterator it;
387    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
388    buffers.clear() ;
389  }
390
391  void CLegacyContextServer::releaseBuffers()
392  {
393    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
394    //buffers.clear() ;
395    freeWindows() ;
396  }
397
398  void CLegacyContextServer::freeWindows()
399  {
400    for(auto& it : winComm_)
401    {
402      int rank = it.first ;
403      delete windows_[rank][0];
404      delete windows_[rank][1];
405    }
406  }
407
408  void CLegacyContextServer::notifyClientsFinalize(void)
409  {
410    for(auto it=buffers.begin();it!=buffers.end();++it)
411    {
412      it->second->notifyClientFinalize() ;
413    }
414  }
415
416  void CLegacyContextServer::dispatchEvent(CEventServer& event)
417  {
418    string contextName;
419    string buff;
420    int MsgSize;
421    int rank;
422    list<CEventServer::SSubEvent>::iterator it;
423    StdString ctxId = context->getId();
424    CContext::setCurrent(ctxId);
425    StdSize totalBuf = 0;
426
427    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
428    {
429      if (info.isActive(logProfile)) CTimer::get("Context finalize").resume();
430      finished=true;
431      info(20)<<" CLegacyContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
432      notifyClientsFinalize() ;
433      if (info.isActive(logTimers)) CTimer::get("receiving requests").suspend();
434      context->finalize();
435     
436      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
437                           iteMap = mapBufferSize_.end(), itMap;
438      for (itMap = itbMap; itMap != iteMap; ++itMap)
439      {
440        rank = itMap->first;
441        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
442            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
443        totalBuf += itMap->second;
444      }
445      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
446      if (info.isActive(logProfile)) CTimer::get("Context finalize").suspend();
447    }
448    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
449    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
450    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
451    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
452    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
453    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
454    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
455    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
456    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
457    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
458    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
459    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
460    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
461    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
462    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
463    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
464    else
465    {
466      ERROR("void CLegacyContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
467    }
468  }
469
470  bool CLegacyContextServer::isCollectiveEvent(CEventServer& event)
471  {
472    if (event.type>1000) return false ;
473    else return true ;
474  }
475}
Note: See TracBrowser for help on using the repository browser.