source: XIOS3/trunk/src/transport/legacy_context_server.cpp @ 2547

Last change on this file since 2547 was 2547, checked in by ymipsl, 9 months ago

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 16.2 KB
Line 
1#include "legacy_context_server.hpp"
2#include "buffer_in.hpp"
3#include "type.hpp"
4#include "context.hpp"
5#include "object_template.hpp"
6#include "group_template.hpp"
7#include "attribute_template.hpp"
8#include "domain.hpp"
9#include "field.hpp"
10#include "file.hpp"
11#include "grid.hpp"
12#include "mpi.hpp"
13#include "tracer.hpp"
14#include "timer.hpp"
15#include "cxios.hpp"
16#include "event_scheduler.hpp"
17#include "server.hpp"
18#include "servers_ressource.hpp"
19#include "pool_ressource.hpp"
20#include "services.hpp"
21#include "contexts_manager.hpp"
22#include "timeline_events.hpp"
23
24#include <boost/functional/hash.hpp>
25#include <random>
26#include <chrono>
27
28
29namespace xios
30{
31  using namespace std ;
32
33  CLegacyContextServer::CLegacyContextServer(CContext* parent,MPI_Comm intraComm_,MPI_Comm interComm_) 
34    : CContextServer(parent, intraComm_, interComm_),
35      isProcessingEvent_(false)
36  {
37 
38    MPI_Comm_dup(intraComm, &processEventBarrier_) ;
39 
40
41    currentTimeLine=1;
42    scheduled=false;
43    finished=false;
44
45    MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ;
46    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows
47   
48    itLastTimeLine=lastTimeLine.begin() ;
49
50    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test)
51  }
52 
53  void CLegacyContextServer::setPendingEvent(void)
54  {
55    pendingEvent=true;
56  }
57
58  bool CLegacyContextServer::hasPendingEvent(void)
59  {
60    return pendingEvent;
61  }
62
63  bool CLegacyContextServer::hasFinished(void)
64  {
65    return finished;
66  }
67
68  bool CLegacyContextServer::eventLoop(bool enableEventsProcessing /*= true*/)
69  {
70    CTimer::get("listen request").resume();
71    listen();
72    CTimer::get("listen request").suspend();
73    CTimer::get("check pending request").resume();
74    checkPendingRequest();
75    checkPendingProbe() ;
76    CTimer::get("check pending request").suspend();
77    CTimer::get("check event process").resume();
78    processEvents(enableEventsProcessing);
79    CTimer::get("check event process").suspend();
80    return finished;
81  }
82
83 void CLegacyContextServer::listen(void)
84  {
85    int rank;
86    int flag;
87    int count;
88    char * addr;
89    MPI_Status status;
90    MPI_Message message ;
91    map<int,CServerBuffer*>::iterator it;
92    bool okLoop;
93
94    traceOff();
95    MPI_Improbe(MPI_ANY_SOURCE, 20, interCommMerged_,&flag,&message, &status);
96    traceOn();
97    if (flag==true) listenPendingRequest(message, status) ;
98  }
99
100  bool CLegacyContextServer::listenPendingRequest( MPI_Message &message, MPI_Status& status)
101  {
102    int count;
103    char * addr;
104    map<int,CServerBuffer*>::iterator it;
105    int rank=status.MPI_SOURCE ;
106
107    it=buffers.find(rank);
108    if (it==buffers.end()) // Receive the buffer size and allocate the buffer
109    {
110      MPI_Aint recvBuff[4] ;
111      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status);
112      remoteHashId_ = recvBuff[0] ;
113      StdSize buffSize = recvBuff[1];
114      vector<MPI_Aint> winBufferAddress(2) ;
115      winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ;
116      mapBufferSize_.insert(std::make_pair(rank, buffSize));
117
118      // create windows dynamically for one-sided
119      int dummy ;
120      MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ;
121      CTimer::get("create Windows").resume() ;
122      MPI_Comm interComm ;
123      int tag = 0 ;
124      MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ;
125      MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ;
126      MPI_Comm_free(&interComm) ;
127      windows_[rank].resize(2) ;
128      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]);
129      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ;
130      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);
131      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ;
132      windows_[rank][0] = new CWindowDynamic() ;
133      windows_[rank][1] = new CWindowDynamic() ;
134      windows_[rank][0] -> create(winComm_[rank]) ;
135      windows_[rank][1] -> create(winComm_[rank]) ;
136      windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ;
137      windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ;
138      CTimer::get("create Windows").suspend() ;
139      CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ;
140      MPI_Barrier(winComm_[rank]) ;
141
142      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winBufferAddress, 0, buffSize)))).first;
143      lastTimeLine[rank]=0 ;
144      itLastTimeLine=lastTimeLine.begin() ;
145
146      return true;
147    }
148    else
149    {
150        std::pair<MPI_Message,MPI_Status> mypair(message,status) ;
151        pendingProbe[rank].push_back(mypair) ;
152        return false;
153    }
154  }
155
156  void CLegacyContextServer::checkPendingProbe(void)
157  {
158   
159    list<int> recvProbe ;
160    list<int>::iterator itRecv ;
161    map<int, list<std::pair<MPI_Message,MPI_Status> > >::iterator itProbe;
162
163    for(itProbe=pendingProbe.begin();itProbe!=pendingProbe.end();itProbe++)
164    {
165      int rank=itProbe->first ;
166      if (pendingRequest.count(rank)==0)
167      {
168        MPI_Message& message = itProbe->second.front().first ;
169        MPI_Status& status = itProbe->second.front().second ;
170        int count ;
171        MPI_Get_count(&status,MPI_CHAR,&count);
172        map<int,CServerBuffer*>::iterator it = buffers.find(rank);
173        if ( (it->second->isBufferFree(count) && !it->second->isResizing()) // accept new request if buffer is free
174          || (it->second->isResizing() && it->second->isBufferEmpty()) )    // or if resizing wait for buffer is empty
175        {
176          char * addr;
177          addr=(char*)it->second->getBuffer(count);
178          MPI_Imrecv(addr,count,MPI_CHAR, &message, &pendingRequest[rank]);
179          bufferRequest[rank]=addr;
180          recvProbe.push_back(rank) ;
181          itProbe->second.pop_front() ;
182        }
183      }
184    }
185
186    for(itRecv=recvProbe.begin(); itRecv!=recvProbe.end(); itRecv++) if (pendingProbe[*itRecv].empty()) pendingProbe.erase(*itRecv) ;
187  }
188
189
190  void CLegacyContextServer::checkPendingRequest(void)
191  {
192    map<int,MPI_Request>::iterator it;
193    list<int> recvRequest;
194    list<int>::iterator itRecv;
195    int rank;
196    int flag;
197    int count;
198    MPI_Status status;
199   
200    if (!pendingRequest.empty()) CTimer::get("receiving requests").resume();
201    else CTimer::get("receiving requests").suspend();
202
203    for(it=pendingRequest.begin();it!=pendingRequest.end();it++)
204    {
205      rank=it->first;
206      traceOff();
207      MPI_Test(& it->second, &flag, &status);
208      traceOn();
209      if (flag==true)
210      {
211        buffers[rank]->updateCurrentWindows() ;
212        recvRequest.push_back(rank);
213        MPI_Get_count(&status,MPI_CHAR,&count);
214        processRequest(rank,bufferRequest[rank],count);
215      }
216    }
217
218    for(itRecv=recvRequest.begin();itRecv!=recvRequest.end();itRecv++)
219    {
220      pendingRequest.erase(*itRecv);
221      bufferRequest.erase(*itRecv);
222    }
223  }
224
225  void CLegacyContextServer::getBufferFromClient(size_t timeLine)
226  {
227    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ;
228
229    int rank ;
230    char *buffer ;
231    size_t count ; 
232
233    if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ;
234    for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine)
235    {
236      rank=itLastTimeLine->first ;
237      if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty())
238      {
239        if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count);
240        if (count >= 0) ++itLastTimeLine ;
241        break ;
242      }
243    }
244    CTimer::get("CLegacyContextServer::getBufferFromClient").suspend() ;
245  }
246         
247       
248  void CLegacyContextServer::processRequest(int rank, char* buff,int count)
249  {
250
251    CBufferIn buffer(buff,count);
252    char* startBuffer,endBuffer;
253    int size, offset;
254    size_t timeLine=0;
255    map<size_t,CEventServer*>::iterator it;
256
257   
258    CTimer::get("Process request").resume();
259    while(count>0)
260    {
261      char* startBuffer=(char*)buffer.ptr();
262      CBufferIn newBuffer(startBuffer,buffer.remain());
263      newBuffer>>size>>timeLine;
264
265      if (timeLine==timelineEventNotifyChangeBufferSize)
266      {
267        buffers[rank]->notifyBufferResizing() ;
268        buffers[rank]->updateCurrentWindows() ;
269        buffers[rank]->popBuffer(count) ;
270        info(100)<<"Context id "<<context->getId()<<" : Receive NotifyChangeBufferSize from client rank "<<rank<<endl
271                 <<"isBufferEmpty ? "<<buffers[rank]->isBufferEmpty()<<"  remaining count : "<<buffers[rank]->getUsed()<<endl;
272      } 
273      else if (timeLine==timelineEventChangeBufferSize)
274      {
275        size_t newSize ;
276        vector<MPI_Aint> winBufferAdress(2) ;
277        newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ;
278        buffers[rank]->freeBuffer(count) ;
279        delete buffers[rank] ;
280        windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ;
281        windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ;
282        buffers[rank] = new CServerBuffer(windows_[rank], winBufferAdress, 0, newSize) ;
283        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank
284                 <<"  newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ;
285      }
286      else
287      {
288        info(100)<<"Context id "<<context->getId()<<" : Receive standard event from client rank "<<rank<<"  with timeLine : "<<timeLine<<endl ;
289        it=events.find(timeLine);
290       
291        if (it==events.end()) it=events.insert(pair<int,CEventServer*>(timeLine,new CEventServer(this))).first;
292        it->second->push(rank,buffers[rank],startBuffer,size);
293        if (timeLine>0) lastTimeLine[rank]=timeLine ;
294      }
295      buffer.advance(size);
296      count=buffer.remain();
297    }
298   
299    CTimer::get("Process request").suspend();
300  }
301
302  void CLegacyContextServer::processEvents(bool enableEventsProcessing)
303  {
304    map<size_t,CEventServer*>::iterator it;
305    CEventServer* event;
306   
307    if (isProcessingEvent_) return ;
308
309    it=events.find(currentTimeLine);
310    if (it!=events.end())
311    {
312      event=it->second;
313
314      if (event->isFull())
315      {
316        if (!scheduled)
317        {
318          eventScheduler_->registerEvent(currentTimeLine,hashId);
319          info(100)<<"Context id "<<context->getId()<<"Schedule event : "<< currentTimeLine <<"  "<<hashId<<endl ;
320          scheduled=true;
321        }
322        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) )
323        {
324          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ;
325
326          if (!eventScheduled_) 
327          {
328            MPI_Ibarrier(processEventBarrier_,&processEventRequest_) ;
329            eventScheduled_=true ;
330            return ;
331          }
332          else 
333          {
334            MPI_Status status ;
335            int flag ;
336            MPI_Test(&processEventRequest_, &flag, &status) ;
337            if (!flag) return ;
338            eventScheduled_=false ;
339          }
340         
341          if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT)
342          {
343            int typeId, classId, typeId_in, classId_in;
344            long long timeLine_out;
345            long long timeLine_in( currentTimeLine );
346            typeId_in=event->type ;
347            classId_in=event->classId ;
348   //        MPI_Allreduce(&timeLine,&timeLine_out, 1, MPI_UINT64_T, MPI_SUM, intraComm) ; // MPI_UINT64_T standardized by MPI 3
349            MPI_Allreduce(&timeLine_in,&timeLine_out, 1, MPI_LONG_LONG_INT, MPI_SUM, intraComm) ; 
350            MPI_Allreduce(&typeId_in,&typeId, 1, MPI_INT, MPI_SUM, intraComm) ;
351            MPI_Allreduce(&classId_in,&classId, 1, MPI_INT, MPI_SUM, intraComm) ;
352            if (typeId/intraCommSize!=event->type || classId/intraCommSize!=event->classId || timeLine_out/intraCommSize!=currentTimeLine)
353            {
354               ERROR("void CLegacyContextClient::sendEvent(CEventClient& event)",
355                  << "Event are not coherent between client for timeline = "<<currentTimeLine);
356            }
357          }
358
359          isProcessingEvent_=true ;
360          CTimer::get("Process events").resume();
361          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ;
362          eventScheduler_->popEvent() ;
363          dispatchEvent(*event);
364          CTimer::get("Process events").suspend();
365          isProcessingEvent_=false ;
366          pendingEvent=false;
367          delete event;
368          events.erase(it);
369          currentTimeLine++;
370          scheduled = false;
371        }
372      }
373      else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ;
374    }
375    else if (pendingRequest.empty()) getBufferFromClient(currentTimeLine) ; // if pure one sided check buffer even if no event recorded at current time line
376  }
377
378  CLegacyContextServer::~CLegacyContextServer()
379  {
380    map<int,CServerBuffer*>::iterator it;
381    for(it=buffers.begin();it!=buffers.end();++it) delete it->second;
382    buffers.clear() ;
383  }
384
385  void CLegacyContextServer::releaseBuffers()
386  {
387    //for(auto it=buffers.begin();it!=buffers.end();++it) delete it->second ;
388    //buffers.clear() ;
389    freeWindows() ;
390  }
391
392  void CLegacyContextServer::freeWindows()
393  {
394    for(auto& it : winComm_)
395    {
396      int rank = it.first ;
397      delete windows_[rank][0];
398      delete windows_[rank][1];
399    }
400  }
401
402  void CLegacyContextServer::notifyClientsFinalize(void)
403  {
404    for(auto it=buffers.begin();it!=buffers.end();++it)
405    {
406      it->second->notifyClientFinalize() ;
407    }
408  }
409
410  void CLegacyContextServer::dispatchEvent(CEventServer& event)
411  {
412    string contextName;
413    string buff;
414    int MsgSize;
415    int rank;
416    list<CEventServer::SSubEvent>::iterator it;
417    StdString ctxId = context->getId();
418    CContext::setCurrent(ctxId);
419    StdSize totalBuf = 0;
420
421    if (event.classId==CContext::GetType() && event.type==CContext::EVENT_ID_CONTEXT_FINALIZE)
422    {
423      finished=true;
424      info(20)<<" CLegacyContextServer: Receive context <"<<context->getId()<<"> finalize."<<endl;
425      notifyClientsFinalize() ;
426      CTimer::get("receiving requests").suspend();
427      context->finalize();
428     
429      std::map<int, StdSize>::const_iterator itbMap = mapBufferSize_.begin(),
430                           iteMap = mapBufferSize_.end(), itMap;
431      for (itMap = itbMap; itMap != iteMap; ++itMap)
432      {
433        rank = itMap->first;
434        report(10)<< " Memory report : Context <"<<ctxId<<"> : server side : memory used for buffer of each connection to client" << endl
435            << "  +) With client of rank " << rank << " : " << itMap->second << " bytes " << endl;
436        totalBuf += itMap->second;
437      }
438      report(0)<< " Memory report : Context <"<<ctxId<<"> : server side : total memory used for buffer "<<totalBuf<<" bytes"<<endl;
439    }
440    else if (event.classId==CContext::GetType()) CContext::dispatchEvent(event);
441    else if (event.classId==CContextGroup::GetType()) CContextGroup::dispatchEvent(event);
442    else if (event.classId==CCalendarWrapper::GetType()) CCalendarWrapper::dispatchEvent(event);
443    else if (event.classId==CDomain::GetType()) CDomain::dispatchEvent(event);
444    else if (event.classId==CDomainGroup::GetType()) CDomainGroup::dispatchEvent(event);
445    else if (event.classId==CAxis::GetType()) CAxis::dispatchEvent(event);
446    else if (event.classId==CAxisGroup::GetType()) CAxisGroup::dispatchEvent(event);
447    else if (event.classId==CScalar::GetType()) CScalar::dispatchEvent(event);
448    else if (event.classId==CScalarGroup::GetType()) CScalarGroup::dispatchEvent(event);
449    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event);
450    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event);
451    else if (event.classId==CField::GetType()) CField::dispatchEvent(event);
452    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event);
453    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event);
454    else if (event.classId==CFileGroup::GetType()) CFileGroup::dispatchEvent(event);
455    else if (event.classId==CVariable::GetType()) CVariable::dispatchEvent(event);
456    else
457    {
458      ERROR("void CLegacyContextServer::dispatchEvent(CEventServer& event)",<<" Bad event class Id"<<endl);
459    }
460  }
461
462  bool CLegacyContextServer::isCollectiveEvent(CEventServer& event)
463  {
464    if (event.type>1000) return false ;
465    else return true ;
466  }
467}
Note: See TracBrowser for help on using the repository browser.