source: XIOS/dev/dev_trunk_omp/src/filter/temporal_filter.cpp @ 1680

Last change on this file since 1680 was 1680, checked in by yushan, 5 years ago

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1676. Arithmetic filter unified

File size: 9.0 KB
Line 
1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
4#include "workflow_graph.hpp"
5#include "file.hpp"
6
7namespace xios
8{
9  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
10
11  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
12                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
13                                   bool ignoreMissingValue /*= false*/)
14    : CFilter(gc, 1, this)
15    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
16    , isOnceOperation(functor->timeType() == func::CFunctor::once)
17    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
18    , samplingFreq(samplingFreq)
19    , samplingOffset(samplingOffset)
20    , opFreq(opFreq)
21    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
22    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
23                        this->samplingOffset.hour, this->samplingOffset.minute,
24                        this->samplingOffset.second, this->samplingOffset.timestep)
25    , initDate(initDate)
26    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
27    , nbOperationDates(1)
28    , nbSamplingDates(0)
29//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
30    , isFirstOperation(true)
31    , temp_op(opId)
32  {
33  }
34
35  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
36  {
37    bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
38
39    if(building_graph)
40    {
41      int edgeID = InvalidableObject::edgeIdGenerator++;
42
43      CWorkflowGraph::allocNodeEdge();
44
45      if(CWorkflowGraph::build_begin)
46      {
47        CWorkflowGraph::addEdge(edgeID, -1, data[0]);       
48      }
49      this->filterIDoutputs_pair.push_back(make_pair(edgeID, data[0]->src_filterID)); 
50    }
51   
52   
53    CDataPacketPtr packet;
54
55    if (data[0]->status != CDataPacket::END_OF_STREAM)
56    {
57      bool usePacket, outputResult, copyLess;
58      if (isOnceOperation)
59        usePacket = outputResult = copyLess = isFirstOperation;
60      else
61      {
62        usePacket = (data[0]->date >= nextSamplingDate);
63        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
64        copyLess = (isInstantOperation && usePacket && outputResult);
65      }
66
67      if (usePacket)
68      {
69        nbSamplingDates ++;
70        if (!copyLess)
71        {
72          if (!tmpData.numElements())
73            tmpData.resize(data[0]->data.numElements());
74
75          (*functor)(data[0]->data);
76        }
77
78        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
79      }
80
81      if (outputResult)
82      {
83        if(building_graph)
84        {
85          this->filterID = InvalidableObject::filterIdGenerator++;
86          CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]);   
87          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;   
88          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ;
89
90          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->recordXiosAttributes();
91          if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->recordXiosAttributes();
92
93
94          for(int i=0; i<this->filterIDoutputs_pair.size(); i++)
95          {
96            (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterIDoutputs_pair[i].second].filter_filled = 0 ; 
97            (*CWorkflowGraph::mapFieldToFilters_ptr_with_info)[this->filterIDoutputs_pair[i].first].to = this->filterID ; 
98          }
99
100          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb = this->filterIDoutputs_pair.size() ;
101        }
102
103
104
105        nbOperationDates ++;
106        if (!copyLess)
107        {
108          functor->final();
109
110          packet = CDataPacketPtr(new CDataPacket);
111          packet->date = data[0]->date;
112          packet->timestamp = data[0]->timestamp;
113          packet->status = data[0]->status;
114          packet->data.resize(tmpData.numElements());
115          packet->data = tmpData;
116        }
117        else
118          packet = data[0];
119
120        isFirstOperation = false;
121       
122        packet->field = this->field;
123        if(building_graph) packet->src_filterID=this->filterID;
124        if(building_graph) this->filterIDoutputs_pair.clear();
125        if(building_graph) CWorkflowGraph::build_begin=true;
126      }
127    }
128
129    return packet;
130  }
131
132
133
134  CDataPacketPtr CTemporalFilter::apply_old(std::vector<CDataPacketPtr> data)
135  {
136    bool building_graph=this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
137   
138    if(building_graph)
139    {
140      if(this->filterIDoutputs.size()==0) this->filterID = InvalidableObject::filterIdGenerator++;
141      int edgeID = InvalidableObject::edgeIdGenerator++;
142     
143      // std::cout<<"CTemporalFilter::apply filter tag = "<<this->tag<<" start = "<<this->start_graph<<" end = "<<this->end_graph<<std::endl;
144
145      CWorkflowGraph::allocNodeEdge();
146
147      if(this->filterIDoutputs.size()==0)
148      {
149        CWorkflowGraph::addNode(this->filterID, "Temporal Filter\\n("+this->temp_op+")", 5, 1, 0, data[0]);   
150        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].transform_type = this->temp_op;   
151        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = false ;
152      }
153
154      if(CWorkflowGraph::build_begin)
155      {
156
157        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
158
159        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;
160        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb += 1 ;
161      }
162
163
164      this->filterIDoutputs.push_back(data[0]->src_filterID); 
165    }
166   
167   
168    CDataPacketPtr packet;
169
170    if (data[0]->status != CDataPacket::END_OF_STREAM)
171    {
172      bool usePacket, outputResult, copyLess;
173      if (isOnceOperation)
174        usePacket = outputResult = copyLess = isFirstOperation;
175      else
176      {
177        usePacket = (data[0]->date >= nextSamplingDate);
178        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
179        copyLess = (isInstantOperation && usePacket && outputResult);
180      }
181
182      if (usePacket)
183      {
184        nbSamplingDates ++;
185        if (!copyLess)
186        {
187          if (!tmpData.numElements())
188            tmpData.resize(data[0]->data.numElements());
189
190          (*functor)(data[0]->data);
191        }
192
193        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
194      }
195
196      if (outputResult)
197      {
198        nbOperationDates ++;
199        if (!copyLess)
200        {
201          functor->final();
202
203          packet = CDataPacketPtr(new CDataPacket);
204          packet->date = data[0]->date;
205          packet->timestamp = data[0]->timestamp;
206          packet->status = data[0]->status;
207          packet->data.resize(tmpData.numElements());
208          packet->data = tmpData;
209        }
210        else
211          packet = data[0];
212
213        isFirstOperation = false;
214       
215        packet->field = this->field;
216        if(building_graph) packet->src_filterID=this->filterID;
217        if(building_graph) this->filterIDoutputs.clear();
218        if(building_graph) CWorkflowGraph::build_begin=true;
219        if(building_graph) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].inputs_complete = true ;
220      }
221    }
222
223    return packet;
224  }
225
226  bool CTemporalFilter::mustAutoTrigger() const
227  {
228    return true;
229  }
230
231  bool CTemporalFilter::isDataExpected(const CDate& date) const
232  {
233//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
234    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
235  }
236
237  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
238  {
239    func::CFunctor* functor = NULL;
240
241    double defaultValue = std::numeric_limits<double>::quiet_NaN();
242
243#define DECLARE_FUNCTOR(MType, mtype) \
244    if (opId.compare(#mtype) == 0) \
245    { \
246      if (ignoreMissingValue) \
247      { \
248        functor = new func::C##MType(tmpData, defaultValue); \
249      } \
250      else \
251      { \
252        functor = new func::C##MType(tmpData); \
253      } \
254    }
255
256#include "functor_type.conf"
257
258    if (!functor)
259      ERROR("createFunctor(const std::string& opId, ...)",
260            << "\"" << opId << "\" is not a valid operation.");
261
262    return functor;
263  }
264} // namespace xios
Note: See TracBrowser for help on using the repository browser.