Ignore:
Timestamp:
09/16/20 18:34:23 (4 years ago)
Author:
ymipsl
Message:

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

-> inputs work again

YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter
Files:
2 added
2 deleted
5 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.cpp

    r1930 r1934  
    1414    { 
    1515      CContext* context = CContext::getCurrent(); 
     16      field_=field ; 
    1617      grid_= field-> getGrid(); 
    1718 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.hpp

    r1930 r1934  
    3737 
    3838    private: 
     39      CField* field_ ; 
    3940      CGrid* grid_;             //!< The grid attached to the data the filter can accept 
    4041      CDuration freqOp_ ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp

    r1930 r1934  
    66#include "calendar_util.hpp" 
    77#include "context.hpp" 
     8#include "event_client.hpp" 
     9#include "timer.hpp" 
     10#include "tracer.hpp" 
    811#include <limits>  
    912 
     
    1417  { 
    1518    CContext* context = CContext::getCurrent(); 
     19    field_ = field ; 
    1620    grid_= field->getGrid(); 
    17     freqOp_ = field->fileIn_->output_freq ; 
     21    freqOp_ = field->getRelFile()->output_freq ; 
     22    client_= field->getRelFile()->getContextClient() ; 
    1823    lastDateReceived_ = context->getCalendar()->getInitDate(); 
    1924    offset_ = field->freq_offset ; 
     
    3944      if (!wasEOF) dateEOF_ = lastDateReceived_; 
    4045      packet->status = CDataPacket::END_OF_STREAM; 
     46      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 
     47      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<"  ----> EOF"<<endl;  
     48 
    4149    } 
    4250    else  
    4351    { 
    44       grid_->getServerFromClientConnector()->transfer(event, packet->data) ; 
     52      CContextClient* client = event.getContextServer()->getAssociatedClient() ; 
     53      grid_->getClientFromServerConnector(client)->transfer(event, packet->data) ; // to avoid to make a search in map for corresponding client connector,  
     54      
     55      info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 
     56      info(20)<<"lastDateReceived_ "<<lastDateReceived_<< "  date "<<packet->date<<endl;                                                                                    // make a registration at initialization once 
    4557      packet->status = CDataPacket::NO_ERROR; 
    4658    } 
     
    4961  } 
    5062  
     63  int CClientFromServerSourceFilter::sendReadDataRequest(const CDate& tsDataRequested) 
     64  { 
     65    CContext* context = CContext::getCurrent(); 
     66    lastDataRequestedFromServer_ = tsDataRequested; 
     67 
     68    // No need to send the request if we are sure that we are already at EOF 
     69    if (!isEOF_ || context->getCalendar()->getCurrentDate() <= dateEOF_) 
     70    { 
     71      CEventClient event(field_->getType(), CField::EVENT_ID_READ_DATA); 
     72      if (client_->isServerLeader()) 
     73      { 
     74        CMessage msg; 
     75        msg << field_->getId(); 
     76        for(auto& rank : client_->getRanksServerLeader()) event.push(rank, 1, msg); 
     77        client_->sendEvent(event); 
     78      } 
     79      else client_->sendEvent(event); 
     80    } 
     81    else  
     82    { 
     83      CDataPacketPtr packet(new CDataPacket); 
     84      packet->date = tsDataRequested; 
     85      packet->timestamp = packet->date ; 
     86      packet->status = CDataPacket::END_OF_STREAM; 
     87      onOutputReady(packet); 
     88    } 
     89 
     90    wasDataRequestedFromServer_ = true; 
     91 
     92    return !isEOF_; 
     93  } 
     94 
     95  bool CClientFromServerSourceFilter::sendReadDataRequestIfNeeded(void) 
     96  TRY 
     97  { 
     98    const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 
     99 
     100    bool dataRequested = false; 
     101 
     102    while (currentDate >= lastDataRequestedFromServer_) 
     103    { 
     104      info(20) << "currentDate : " << currentDate << endl ; 
     105      info(20) << "Field : " << field_->getId() << endl ; 
     106      info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer_ << endl ; 
     107      info(20) << "freqOp : " << freqOp_ << endl ; 
     108      info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer_ + freqOp_ << endl ; 
     109 
     110      dataRequested |= sendReadDataRequest(lastDataRequestedFromServer_ + freqOp_); 
     111    } 
     112 
     113    return dataRequested; 
     114  } 
     115  CATCH 
     116 
     117  void CClientFromServerSourceFilter::checkForLateData(void) 
     118  TRY 
     119  { 
     120    CContext* context = CContext::getCurrent(); 
     121    // Check if data previously requested has been received as expected 
     122    if (wasDataRequestedFromServer_ && ! isEOF_) 
     123    { 
     124      CTimer timer("CClientFromServerSourceFilter::checkForLateDataFromServer"); 
     125      timer.resume(); 
     126      traceOff() ; 
     127      timer.suspend(); 
     128       
     129      bool isLate; 
     130      do 
     131      { 
     132        isLate = isDataLate(); 
     133        if (isLate) 
     134        { 
     135          timer.resume(); 
     136          context->globalEventLoop(); 
     137          timer.suspend(); 
     138        } 
     139      } 
     140      while (isLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 
     141      timer.resume(); 
     142      traceOn() ; 
     143      timer.suspend() ; 
     144 
     145 
     146      if (isLate) 
     147        ERROR("void CClientFromServerSourceFilter::checkForLateDataFromServer(void)", 
     148              << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 
     149    } 
     150  } 
     151  CATCH 
     152 
     153   
    51154  bool CClientFromServerSourceFilter::isDataLate(void) 
    52155  { 
     
    60163     
    61164  } 
     165 
     166 
    62167} // namespace xios 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.hpp

    r1930 r1934  
    66#include "output_pin.hpp" 
    77#include "event_server.hpp" 
     8#include "context_client.hpp" 
    89#include "calendar_util.hpp" 
    910 
     
    3839      bool isDataLate(void) ; 
    3940      bool isEOF() {return isEOF_ ;} 
     41      int  sendReadDataRequest(const CDate& tsDataRequested) ; 
     42      bool sendReadDataRequestIfNeeded(void) ; 
     43      void checkForLateData(void) ; 
    4044 
    4145     private: 
     46      CField* field_; 
    4247      CGrid* grid_;             //!< The grid attached to the data the filter can accept 
    4348      CDuration freqOp_ ; 
    4449      CDuration offset_ ; 
    45  
     50      CContextClient* client_ = nullptr ; 
    4651      bool wasDataAlreadyReceived_= false ; 
    4752      CDate lastDateReceived_ ; 
     53      bool wasDataRequestedFromServer_ = false ; 
     54      CDate lastDataRequestedFromServer_ ; 
     55 
    4856      bool isEOF_ = false ; 
    4957      CDate dateEOF_ ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_reader_source_filter.cpp

    r1930 r1934  
    3030    packet->status = CDataPacket::NO_ERROR; 
    3131 
    32     CArray<double,1> data ; 
    3332    if (!isInitialized_)  initialize() ; 
    3433    CField::EReadField readState = CField::RF_DATA; 
    35  
    3634    if ( nStepMax_==0 || (nStep_ >= nStepMax_ && !isCyclic_)) readState = CField::RF_EOF; 
    3735   
    3836    if (CField::RF_EOF != readState) 
    3937    { 
    40       if (file_->isEmptyZone()) readData(data) ; 
     38      if (!file_->isEmptyZone()) readData(packet->data) ; 
    4139      else readState = CField::RF_NODATA; 
    4240    } 
     
    4543    if (readState == CField::RF_DATA) packet->status = CDataPacket::NO_ERROR; 
    4644    else packet->status = CDataPacket::END_OF_STREAM; 
    47              
     45     
     46    info(20)<<"Read data from file : FieldId "<<field_->getId()<<"  nStep "<<nStep_<<"  date : "<<packet->date<<endl ; 
     47            
    4848    onOutputReady(packet); 
    4949  } 
     
    5353    CContext* context = CContext::getCurrent(); 
    5454    file_->initRead(); 
    55     if (file_->isEmptyZone()) 
     55    if (!file_->isEmptyZone()) 
    5656    {       
    5757      file_->checkReadFile(); 
     
    6060    } 
    6161    MPI_Allreduce(MPI_IN_PLACE, &nStepMax_, 1, MPI_INT, MPI_MAX, context->getIntraComm()); 
     62    isInitialized_=true; 
    6263  } 
    6364 
     
    6566  { 
    6667    CGridLocalConnector*  connector = grid_->getFullToWorkflowConnector() ; 
    67     CArray<double,1> dataIn(connector->getDstSize()) ; 
     68    CArray<double,1> dataIn(connector->getSrcSize()) ; 
    6869    file_->getDataInput()->readFieldData(field_, nStep_%nStepMax_, dataIn); 
     70    data.resize(connector->getDstSize()) ; 
    6971    connector->transfer(dataIn, data) ;  
    7072 
Note: See TracChangeset for help on using the changeset viewer.