source: XIOS/trunk/src/filter/temporal_filter.cpp @ 1704

Last change on this file since 1704 was 1704, checked in by yushan, 5 years ago

Introducing the new graph functionality. Attribute build_workflow_graph=.TRUE. is used in the field definition section in the xml file to enable the workflow graph of the field and other fields referecing to it. A more detailed document will be available soon on the graph fuctionality.

File size: 6.5 KB
Line 
1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
4#include "workflow_graph.hpp"
5#include "file.hpp"
6
7namespace xios
8{
9  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
10
11  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
12                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
13                                   bool ignoreMissingValue /*= false*/)
14    : CFilter(gc, 1, this)
15    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
16    , isOnceOperation(functor->timeType() == func::CFunctor::once)
17    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
18    , samplingFreq(samplingFreq)
19    , samplingOffset(samplingOffset)
20    , opFreq(opFreq)
21    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
22    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
23                        this->samplingOffset.hour, this->samplingOffset.minute,
24                        this->samplingOffset.second, this->samplingOffset.timestep)
25    , initDate(initDate)
26    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
27    , nbOperationDates(1)
28    , nbSamplingDates(0)
29//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
30    , isFirstOperation(true)
31    , temp_op(opId)
32  {
33  }
34
35 
36
37
38
39  bool CTemporalFilter::buildGraph(std::vector<CDataPacketPtr> data)
40  {
41    bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
42   
43    if(building_graph)
44    {
45      if(this->filterIDoutputs.size()==0) this->filterID = InvalidableObject::filterIdGenerator++;
46      int edgeID = InvalidableObject::edgeIdGenerator++;
47     
48      // std::cout<<"CTemporalFilter::apply filter tag = "<<this->tag<<" start = "<<this->start_graph<<" end = "<<this->end_graph<<std::endl;
49
50      CWorkflowGraph::allocNodeEdge();
51
52      if(this->filterIDoutputs.size()==0)
53      {
54        CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]);   
55        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;   
56        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ;
57        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].clusterID = 1 ;
58        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = (data[0]->distance);
59
60
61        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
62        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes();
63      }
64
65      if(CWorkflowGraph::build_begin)
66      {
67
68        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
69
70        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;
71        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ;
72        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = max(data[0]->distance+1, (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance);
73      }
74
75
76      this->filterIDoutputs.push_back(data[0]->src_filterID); 
77    }
78
79    return building_graph;
80  }
81
82
83  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
84  {
85    bool BG = buildGraph(data);
86
87    CDataPacketPtr packet;
88
89    if (data[0]->status != CDataPacket::END_OF_STREAM)
90    {
91      bool usePacket, outputResult, copyLess;
92      if (isOnceOperation)
93        usePacket = outputResult = copyLess = isFirstOperation;
94      else
95      {
96        usePacket = (data[0]->date >= nextSamplingDate);
97        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
98        copyLess = (isInstantOperation && usePacket && outputResult);
99      }
100
101      if (usePacket)
102      {
103        nbSamplingDates ++;
104        if (!copyLess)
105        {
106          if (!tmpData.numElements())
107            tmpData.resize(data[0]->data.numElements());
108
109          (*functor)(data[0]->data);
110        }
111
112        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
113      }
114
115      if (outputResult)
116      {
117        nbOperationDates ++;
118        if (!copyLess)
119        {
120          functor->final();
121
122          packet = CDataPacketPtr(new CDataPacket);
123          packet->date = data[0]->date;
124          packet->timestamp = data[0]->timestamp;
125          packet->status = data[0]->status;
126          packet->data.resize(tmpData.numElements());
127          packet->data = tmpData;
128        }
129        else
130          packet = data[0];
131
132        isFirstOperation = false;
133       
134        packet->field = this->field;
135       
136        if(BG)
137        {
138          packet->src_filterID=this->filterID;
139          packet->distance = data[0]->distance+1;
140          this->filterIDoutputs.clear();
141          CWorkflowGraph::build_begin=true;
142          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ;
143        }
144      }
145    }
146
147    return packet;
148  }
149
150  bool CTemporalFilter::mustAutoTrigger() const
151  {
152    return true;
153  }
154
155  bool CTemporalFilter::isDataExpected(const CDate& date) const
156  {
157//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
158    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
159  }
160
161  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
162  {
163    func::CFunctor* functor = NULL;
164
165    double defaultValue = std::numeric_limits<double>::quiet_NaN();
166
167#define DECLARE_FUNCTOR(MType, mtype) \
168    if (opId.compare(#mtype) == 0) \
169    { \
170      if (ignoreMissingValue) \
171      { \
172        functor = new func::C##MType(tmpData, defaultValue); \
173      } \
174      else \
175      { \
176        functor = new func::C##MType(tmpData); \
177      } \
178    }
179
180#include "functor_type.conf"
181
182    if (!functor)
183      ERROR("createFunctor(const std::string& opId, ...)",
184            << "\"" << opId << "\" is not a valid operation.");
185
186    return functor;
187  }
188} // namespace xios
Note: See TracBrowser for help on using the repository browser.