Changeset 1883


Ignore:
Timestamp:
05/19/20 16:44:47 (3 months ago)
Author:
ymipsl
Message:

XIOS coupling branch
Adopt infrastructure based on filter for reading data on server side and sending it to the client, in a similar way on what is done for other case.

YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src
Files:
4 added
3 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp

    r1878 r1883  
    11661166    else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 
    11671167 
     1168    // client side, assign context for file reading 
    11681169    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ; 
     1170     
     1171    // server side, assign context where to send file data read 
     1172    if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER)  
     1173      for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ; 
    11691174    
    11701175    // workflow endpoint => sent to IO/SERVER 
     
    11921197    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 
    11931198    { 
    1194       // no filter to send data from server to client => to be implemented (reading case) 
     1199      for(auto field : fileInField)  
     1200      { 
     1201        field->connectToServerToClient(garbageCollector) ; 
     1202      } 
    11951203    } 
    11961204 
     
    12471255    if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 
    12481256    { 
    1249       // no filter for reading data from file => to be implemented 
     1257      for(auto field : fileInField)  
     1258      { 
     1259        field->connectToFileReader(garbageCollector) ; 
     1260      } 
    12501261    } 
    12511262     
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp

    r1882 r1883  
    2424#include "spatial_transform_filter.hpp" 
    2525#include "file_server_writer_filter.hpp" 
     26#include "file_server_reader_filter.hpp" 
     27#include "server_to_client_filter.hpp" 
    2628#include "tracer.hpp" 
    2729 
     
    435437  TRY 
    436438  { 
    437     CContextClient* client = server->getAssociatedClient() ; 
     439         
     440    CArray<double,1> data ; 
     441    EReadField hasData = readField(data); 
     442    CDate date = CContext::getCurrent()->getCalendar()->getCurrentDate(); 
     443    if (hasData == RF_DATA) fileServerReaderFilter_->streamData(date,data) ; 
     444    else fileServerReaderFilter_->signalEndOfStream(date) ; 
     445 
     446  } 
     447  CATCH_DUMP_ATTR 
     448 
     449 
     450  void CField::sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) 
     451  TRY 
     452  { 
    438453    CEventClient event(getType(), EVENT_ID_READ_DATA_READY); 
    439454    std::list<CMessage> msgs; 
    440      
    441     CArray<double,1> data ; 
    442     EReadField hasData = readField(data); 
    443  
    444     map<int, CArray<double,1> >::iterator it; 
    445455    if (!grid_->doGridHaveDataDistributed(client)) 
    446456    { 
     
    455465              CMessage& msg = msgs.back(); 
    456466              msg << getId(); 
    457               switch (hasData) 
    458               { 
    459                 case RF_DATA: 
    460                   msg << getNStep() - 1 << data; 
    461                   break; 
    462                 case RF_NODATA: 
    463                   msg << int(-2) << data; 
    464                   break; 
    465                 case RF_EOF:                   
    466                 default: 
    467                   msg << int(-1); 
    468                   break; 
    469               } 
    470  
     467               
     468              if (isEOF)  msg << int(-1); 
     469              else msg << getNStep() - 1 << data; 
     470          
    471471              event.push(*itRank, 1, msg); 
    472472            } 
     
    484484      for (auto it = outLocalIndexStoreOnClient.begin(); it != outLocalIndexStoreOnClient.end(); ++it) 
    485485      { 
    486         CArray<size_t,1>& indexTmp = it->second; 
    487         CArray<double,1> tmp(indexTmp.numElements()); 
    488         for (int idx = 0; idx < indexTmp.numElements(); ++idx) 
    489         { 
    490           tmp(idx) = data(indexTmp(idx)); 
    491         }  
     486         
    492487 
    493488        msgs.push_back(CMessage()); 
    494489        CMessage& msg = msgs.back(); 
    495490        msg << getId(); 
    496         switch (hasData) 
     491        if (isEOF) msg << int(-1); 
     492        else  
    497493        { 
    498           case RF_DATA: 
    499             msg << getNStep() - 1 << tmp; 
    500             break; 
    501           case RF_NODATA: 
    502             msg << int(-2) << tmp; 
    503             break; 
    504           case RF_EOF:                   
    505           default: 
    506             msg << int(-1); 
    507             break; 
     494          CArray<size_t,1>& indexTmp = it->second; 
     495          CArray<double,1> tmp(indexTmp.numElements()); 
     496          for (int idx = 0; idx < indexTmp.numElements(); ++idx) tmp(idx) = data(indexTmp(idx)); 
     497          msg << getNStep() - 1 << tmp; 
    508498        } 
    509  
    510499        event.push(it->first, grid_->getNbReadSenders(client)[it->first], msg); 
    511500      } 
     
    14811470  }  
    14821471   
     1472  /*! 
     1473   * Connect field to a file reader filter to read data from file (on server side). 
     1474   */ 
     1475  void CField::connectToFileReader(CGarbageCollector& gc) 
     1476  { 
     1477    fileServerReaderFilter_ = std::shared_ptr<CFileServerReaderFilter>(new CFileServerReaderFilter(gc, this)); 
     1478    fileServerReaderFilter_->connectOutput(inputFilter, 0); 
     1479  }  
    14831480 
    14841481  /*! 
     
    14921489    storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, detectMissingValues, defaultValue)); 
    14931490    instantDataFilter->connectOutput(storeFilter, 0); 
     1491  } 
     1492 
     1493 
     1494  
     1495  void CField::connectToServerToClient(CGarbageCollector& gc) 
     1496  { 
     1497    serverToClientFilter_ = std::shared_ptr<CServerToClientFilter>(new CServerToClientFilter(gc, this, client)); 
     1498    instantDataFilter->connectOutput(serverToClientFilter_, 0); 
    14941499  } 
    14951500 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.hpp

    r1882 r1883  
    4343   class CFileWriterFilter; 
    4444   class CFileServerWriterFilter; 
    45  
     45   class CFileServerReaderFilter; 
     46   class CServerToClientFilter; 
    4647   ///-------------------------------------------------------------- 
    4748 
     
    183184        static bool dispatchEvent(CEventServer& event); 
    184185        void sendAllAttributesToServer(CContextClient* client) ;  
    185         void sendUpdateData(const CArray<double,1>& data); 
    186186        void sendUpdateData(Time timestamp, const CArray<double,1>& data, CContextClient* client); 
     187        void sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) ; 
     188 
    187189        static void recvUpdateData(CEventServer& event); 
    188190        void recvUpdateData(std::map<int,CBufferIn*>& rankBuffers); 
     
    250252        void connectToServerInput(CGarbageCollector& gc) ; 
    251253        void connectToModelOutput(CGarbageCollector& gc); 
     254        void connectToFileReader(CGarbageCollector& gc) ; 
     255        void connectToServerToClient(CGarbageCollector& gc) ; 
    252256 
    253257        void computeGridIndexToFileServer(void) ; 
     
    402406         //! The terminal filter which writes data to file 
    403407         std::shared_ptr<CFileServerWriterFilter> fileServerWriterFilter; 
     408 
     409         //! The source filter which read data from file 
     410         std::shared_ptr<CFileServerReaderFilter> fileServerReaderFilter_; 
     411 
     412         //! The terminal filter which send data from file server to client 
     413         std::shared_ptr<CServerToClientFilter> serverToClientFilter_; 
     414 
     415 
    404416   }; // class CField 
    405417 
Note: See TracChangeset for help on using the changeset viewer.