#include "temporal_filter.hpp" #include "functor_type.hpp" #include "calendar_util.hpp" #include "workflow_graph.hpp" #include "file.hpp" namespace xios { static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray& tmpData); CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId, const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq, bool ignoreMissingValue /*= false*/) : CFilter(gc, 1, this) , functor(createFunctor(opId, ignoreMissingValue, tmpData)) , isOnceOperation(functor->timeType() == func::CFunctor::once) , isInstantOperation(functor->timeType() == func::CFunctor::instant) , samplingFreq(samplingFreq) , samplingOffset(samplingOffset) , opFreq(opFreq) , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0) , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day, this->samplingOffset.hour, this->samplingOffset.minute, this->samplingOffset.second, this->samplingOffset.timestep) , initDate(initDate) , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep())) , nbOperationDates(1) , nbSamplingDates(0) // , nextOperationDate(initDate + opFreq + this->samplingOffset) , isFirstOperation(true) , temp_op(opId) { } bool CTemporalFilter::buildGraph(std::vector data) { bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false; if(building_graph) { if(this->filterIDoutputs.size()==0) this->filterID = InvalidableObject::filterIdGenerator++; int edgeID = InvalidableObject::edgeIdGenerator++; // std::cout<<"CTemporalFilter::apply filter tag = "<tag<<" start = "<start_graph<<" end = "<end_graph<filterIDoutputs.size()==0) { CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]); (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op; (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ; (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID = 1 ; (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = (data[0]->distance); (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "
file attributes :
" +this->field->file->record4graphXiosAttributes(); } if(CWorkflowGraph::build_begin) { CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]); (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ; (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = max(data[0]->distance+1, (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance); } this->filterIDoutputs.push_back(data[0]->src_filterID); } return building_graph; } CDataPacketPtr CTemporalFilter::apply(std::vector data) { bool BG = buildGraph(data); CDataPacketPtr packet=0; if (data[0]->status != CDataPacket::END_OF_STREAM) { bool usePacket, outputResult, copyLess; if (isOnceOperation) usePacket = outputResult = copyLess = isFirstOperation; else { usePacket = (data[0]->date >= nextSamplingDate); outputResult = (data[0]->date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); copyLess = (isInstantOperation && usePacket && outputResult); } if (usePacket) { nbSamplingDates ++; if (!copyLess) { if (!tmpData.numElements()) tmpData.resize(data[0]->data.numElements()); (*functor)(data[0]->data); } nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep(); } if (outputResult) { nbOperationDates ++; if (!copyLess) { functor->final(); packet = CDataPacketPtr(new CDataPacket); packet->date = data[0]->date; packet->timestamp = data[0]->timestamp; packet->status = data[0]->status; packet->data.resize(tmpData.numElements()); packet->data = tmpData; } else packet = data[0]; isFirstOperation = false; packet->field = this->field; if(BG) { packet->src_filterID=this->filterID; packet->distance = data[0]->distance+1; this->filterIDoutputs.clear(); CWorkflowGraph::build_begin=true; (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ; } } } return packet; } bool CTemporalFilter::mustAutoTrigger() const { return true; } bool CTemporalFilter::isDataExpected(const CDate& date) const { // return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate); return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); } static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray& tmpData) { func::CFunctor* functor = NULL; double defaultValue = std::numeric_limits::quiet_NaN(); #define DECLARE_FUNCTOR(MType, mtype) \ if (opId.compare(#mtype) == 0) \ { \ if (ignoreMissingValue) \ { \ functor = new func::C##MType(tmpData, defaultValue); \ } \ else \ { \ functor = new func::C##MType(tmpData); \ } \ } #include "functor_type.conf" if (!functor) ERROR("createFunctor(const std::string& opId, ...)", << "\"" << opId << "\" is not a valid operation."); return functor; } } // namespace xios