source: XIOS3/trunk/src/filter/file_reader_source_filter.cpp @ 2628

Last change on this file since 2628 was 2628, checked in by jderouillat, 7 weeks ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 2.9 KB
Line 
1#include "file_reader_source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include "context.hpp"
6#include "field.hpp"
7#include "file.hpp"
8#include "context.hpp"
9#include "workflow_graph.hpp"
10
11namespace xios
12{
13  extern CLogType logProfile ;
14  CFileReaderSourceFilter::CFileReaderSourceFilter(CGarbageCollector& gc, CField* field)
15    : COutputPin(gc)
16  {
17    field_ = field ;
18    grid_ = field->getGrid() ;
19    file_ = field->getFileIn() ;
20    if (!file_->cyclic.isEmpty()) isCyclic_ = file_->cyclic ;
21    if (!field_->scale_factor.isEmpty()) { hasScaleFactor_=true ; scaleFactor_ = field_->scale_factor ; }
22    if (!field_->add_offset.isEmpty()) { hasAddOffset_=true ; addOffset_ = field_->add_offset ; }
23  }
24
25  void CFileReaderSourceFilter::streamData()
26  {
27    if (info.isActive(logProfile)) CTimer::get("Reader workflow data entry").resume();
28    Time timeStamp ;
29    CDataPacketPtr packet(new CDataPacket);
30    packet->date = CContext::getCurrent()->getCalendar()->getCurrentDate();
31    packet->timestamp = timeStamp;
32    packet->status = CDataPacket::NO_ERROR;
33
34    if (!isInitialized_)  initialize() ;
35    CField::EReadField readState = CField::RF_DATA;
36    if ( nStepMax_==0 || (nStep_ >= nStepMax_ && !isCyclic_)) readState = CField::RF_EOF;
37 
38    if (CField::RF_EOF != readState)
39    {
40      if (!file_->isEmptyZone()) readData(packet->data) ;
41      else readState = CField::RF_NODATA;
42    }
43    nStep_++ ;
44   
45    if (readState == CField::RF_DATA) packet->status = CDataPacket::NO_ERROR;
46    else packet->status = CDataPacket::END_OF_STREAM;
47   
48    info(20)<<"Read data from file : FieldId "<<field_->getId()<<"  nStep "<<nStep_<<"  date : "<<packet->date<<endl ;
49 
50    if(this->graphEnabled)
51    {
52      this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
53      CWorkflowGraph::addNode("File Reader Source filter", 1, false, 0, packet);
54    }
55    if (info.isActive(logProfile)) CTimer::get("Reader workflow data entry").suspend();
56           
57    onOutputReady(packet);
58  }
59
60  void CFileReaderSourceFilter::initialize()
61  {
62    CContext* context = CContext::getCurrent();
63    file_->initRead();
64    if (!file_->isEmptyZone())
65    {     
66      file_->checkReadFile();
67      nStepMax_ = file_->getDataInput()->getFieldNbRecords(field_);
68      nStep_ = file_->record_offset.isEmpty() ? 0 : file_->record_offset; ;
69    }
70    MPI_Allreduce(MPI_IN_PLACE, &nStepMax_, 1, MPI_INT, MPI_MAX, context->getIntraComm());
71    isInitialized_=true;
72  }
73
74  void CFileReaderSourceFilter::readData(CArray<double,1>& data)
75  {
76    shared_ptr<CGridLocalConnector> connector = grid_->getFullToWorkflowConnector() ;
77    CArray<double,1> dataIn(connector->getSrcSize()) ;
78    file_->getDataInput()->readFieldData(field_, nStep_%nStepMax_, dataIn);
79    data.resize(connector->getDstSize()) ;
80    connector->transfer(dataIn, data) ; 
81
82    if (hasScaleFactor_ || hasAddOffset_) data = data * scaleFactor_ + addOffset_; // possibility of optimization
83  }
84 
85 
86} // namespace xios
Note: See TracBrowser for help on using the repository browser.