source: XIOS/dev/dev_trunk_graph/src/filter/temporal_filter.cpp @ 2023

Last change on this file since 2023 was 2023, checked in by yushan, 3 years ago

Graph intermediate commit to a tmp branch.

File size: 5.0 KB
Line 
1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
4#include "workflow_graph.hpp"
5
6namespace xios
7{
8  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
9
10  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
11                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
12                                   bool ignoreMissingValue /*= false*/)
13    : CFilter(gc, 1, this)
14    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
15    , isOnceOperation(functor->timeType() == func::CFunctor::once)
16    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
17    , samplingFreq(samplingFreq)
18    , samplingOffset(samplingOffset)
19    , opFreq(opFreq)
20    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
21    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
22                        this->samplingOffset.hour, this->samplingOffset.minute,
23                        this->samplingOffset.second, this->samplingOffset.timestep)
24    , initDate(initDate)
25    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
26    , nbOperationDates(1)
27    , nbSamplingDates(0)
28    , isFirstOperation(true)
29    , graphCycleCompleted(true)
30  {
31  }
32
33  void CTemporalFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data)
34  {
35    if(this->graphEnabled )
36    {
37      if(!data[0]->graphPackage)
38      {
39        data[0]->graphPackage = new CGraphDataPackage;
40      }
41     
42      //if(this->graphPackage->sourceFilterIds.size()==0)
43      if(graphCycleCompleted)
44      { 
45        this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
46        CWorkflowGraph::addNode("Temporal filter", 3, false, 0, data[0]);
47        graphCycleCompleted = false;
48      }
49     
50      CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]);
51      data[0]->graphPackage->fromFilter = this->graphPackage->filterId;
52      this->graphPackage->sourceFilterIds.push_back(data[0]->graphPackage->fromFilter); 
53      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
54      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
55    }
56
57  }
58  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
59  {
60    buildWorkflowGraph(data);
61 
62    CDataPacketPtr packet;
63
64    if (data[0]->status != CDataPacket::END_OF_STREAM)
65    {
66      bool usePacket, outputResult, copyLess;
67      if (isOnceOperation)
68        usePacket = outputResult = copyLess = isFirstOperation;
69      else
70      {
71        usePacket = (data[0]->date >= nextSamplingDate);
72        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
73        copyLess = (isInstantOperation && usePacket && outputResult);
74      }
75
76      if (usePacket)
77      {
78        nbSamplingDates ++;
79        if (!copyLess)
80        {
81          if (!tmpData.numElements())
82            tmpData.resize(data[0]->data.numElements());
83
84          (*functor)(data[0]->data);
85        }
86
87        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
88      }
89
90      if (outputResult)
91      {
92        nbOperationDates ++;
93        if (!copyLess)
94        {
95          functor->final();
96
97          packet = CDataPacketPtr(new CDataPacket);
98          packet->date = data[0]->date;
99          packet->timestamp = data[0]->timestamp;
100          packet->status = data[0]->status;
101          packet->data.resize(tmpData.numElements());
102          packet->data = tmpData;
103          packet->graphPackage = data[0]->graphPackage;
104        }
105        else
106          packet = data[0];
107
108        isFirstOperation = false;
109       
110        graphCycleCompleted = true;
111      }
112    }
113
114    return packet;
115  }
116
117  bool CTemporalFilter::mustAutoTrigger() const
118  {
119    return true;
120  }
121
122  bool CTemporalFilter::isDataExpected(const CDate& date) const
123  {
124    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
125  }
126
127  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
128  {
129    func::CFunctor* functor = NULL;
130
131    double defaultValue = std::numeric_limits<double>::quiet_NaN();
132
133#define DECLARE_FUNCTOR(MType, mtype) \
134    if (opId.compare(#mtype) == 0) \
135    { \
136      if (ignoreMissingValue) \
137      { \
138        functor = new func::C##MType(tmpData, defaultValue); \
139      } \
140      else \
141      { \
142        functor = new func::C##MType(tmpData); \
143      } \
144    }
145
146#include "functor_type.conf"
147
148    if (!functor)
149      ERROR("createFunctor(const std::string& opId, ...)",
150            << "\"" << opId << "\" is not a valid operation.");
151
152    return functor;
153  }
154} // namespace xios
Note: See TracBrowser for help on using the repository browser.