source: XIOS3/trunk/src/filter/temporal_filter.cpp

Last change on this file was 2628, checked in by jderouillat, 3 months ago

New timers integration/reporting

File size: 5.9 KB
Line 
1#include "temporal_filter.hpp"
2#include "functor_type.hpp"
3#include "calendar_util.hpp"
4#include "workflow_graph.hpp"
5#include "mem_checker.hpp"
6#include "timer.hpp"
7
8namespace xios
9{
10  extern CLogType logProfile ;
11  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData);
12
13  CTemporalFilter::CTemporalFilter(CGarbageCollector& gc, const std::string& opId,
14                                   const CDate& initDate, const CDuration samplingFreq, const CDuration samplingOffset, const CDuration opFreq,
15                                   bool ignoreMissingValue /*= false*/)
16    : CFilter(gc, 1, this)
17    , functor(createFunctor(opId, ignoreMissingValue, tmpData))
18    , isOnceOperation(functor->timeType() == func::CFunctor::once)
19    , isInstantOperation(functor->timeType() == func::CFunctor::instant)
20    , samplingFreq(samplingFreq)
21    , samplingOffset(samplingOffset)
22    , opFreq(opFreq)
23    , offsetMonth(0, this->samplingOffset.month, 0, 0, 0, 0, 0)
24    , offsetAllButMonth(this->samplingOffset.year, 0 , this->samplingOffset.day,
25                        this->samplingOffset.hour, this->samplingOffset.minute,
26                        this->samplingOffset.second, this->samplingOffset.timestep)
27    , initDate(initDate)
28//    , nextSamplingDate(initDate + (this->samplingOffset + initDate.getRelCalendar().getTimeStep()))
29    , nextSamplingDate(initDate + offsetMonth + ( offsetAllButMonth + initDate.getRelCalendar().getTimeStep()))
30    , nbOperationDates(1)
31    , nbSamplingDates(0)
32//    , nextOperationDate(initDate + opFreq + this->samplingOffset)
33    , isFirstOperation(true)
34    , graphCycleCompleted(true)
35    , temporalOperation(opId)
36  {
37  }
38
39  std::string CTemporalFilter::getTemporalOperation()
40  {
41    return this->temporalOperation;
42  }
43
44  void CTemporalFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data)
45  {
46    if(this->graphEnabled )
47    {
48      if(!data[0]->graphPackage)
49      {
50        data[0]->graphPackage = new CGraphDataPackage;
51      }
52     
53      if(graphCycleCompleted)
54      { 
55        this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
56        CWorkflowGraph::addNode("Temporal filter \\n("+getTemporalOperation()+")", 3, false, 0, data[0]);
57        graphCycleCompleted = false;
58      }
59     
60      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
61      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
62     
63      CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]);
64      data[0]->graphPackage->fromFilter = this->graphPackage->filterId;
65      // this->graphPackage->sourceFilterIds.push_back(data[0]->graphPackage->fromFilter);
66      data[0]->graphPackage->currentField = this->graphPackage->inFields[0];
67      std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end());
68    }
69
70  }
71 
72  CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data)
73  {
74    buildWorkflowGraph(data);
75 
76    CDataPacketPtr packet;
77
78    if (data[0]->status != CDataPacket::END_OF_STREAM)
79    {
80      if (info.isActive(logProfile)) CTimer::get("Temporal filters").resume() ;
81      bool usePacket, outputResult, copyLess;
82      if (isOnceOperation)
83        usePacket = outputResult = copyLess = isFirstOperation;
84      else
85      {
86        usePacket = (data[0]->date >= nextSamplingDate);
87        outputResult = (data[0]->date  > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
88        copyLess = (isInstantOperation && usePacket && outputResult);
89      }
90
91      if (usePacket)
92      {
93        nbSamplingDates ++;
94        if (!copyLess)
95        {
96          if (!tmpData.numElements())
97            tmpData.resize(data[0]->data.numElements());
98
99          (*functor)(data[0]->data);
100        }
101
102        nextSamplingDate = ((initDate + offsetMonth) + nbSamplingDates * samplingFreq) + offsetAllButMonth + initDate.getRelCalendar().getTimeStep();
103      }
104
105      if (outputResult)
106      {
107        nbOperationDates ++;
108        if (!copyLess)
109        {
110          functor->final();
111
112          packet = CDataPacketPtr(new CDataPacket);
113          packet->date = data[0]->date;
114          packet->timestamp = data[0]->timestamp;
115          packet->status = data[0]->status;
116          packet->data.resize(tmpData.numElements());
117          packet->data = tmpData;
118          packet->graphPackage = data[0]->graphPackage;
119        }
120        else
121          packet = data[0];
122
123        CMemChecker::logMem( "CTemporalFilter::apply" );
124
125        isFirstOperation = false;
126        graphCycleCompleted = true;
127     }
128     if (info.isActive(logProfile)) CTimer::get("Temporal filters").suspend() ;
129    }
130
131    return packet;
132  }
133
134  bool CTemporalFilter::mustAutoTrigger() const
135  {
136    return true;
137  }
138
139  bool CTemporalFilter::isDataExpected(const CDate& date) const
140  {
141//    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date + samplingFreq > nextOperationDate);
142    return isOnceOperation ? isFirstOperation : (date >= nextSamplingDate || date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth);
143  }
144
145  static func::CFunctor* createFunctor(const std::string& opId, bool ignoreMissingValue, CArray<double, 1>& tmpData)
146  {
147    func::CFunctor* functor = NULL;
148
149    double defaultValue = std::numeric_limits<double>::quiet_NaN();
150
151#define DECLARE_FUNCTOR(MType, mtype) \
152    if (opId.compare(#mtype) == 0) \
153    { \
154      if (ignoreMissingValue) \
155      { \
156        functor = new func::C##MType(tmpData, defaultValue); \
157      } \
158      else \
159      { \
160        functor = new func::C##MType(tmpData); \
161      } \
162    }
163
164#include "functor_type.conf"
165
166    if (!functor)
167      ERROR("createFunctor(const std::string& opId, ...)",
168            << "\"" << opId << "\" is not a valid operation.");
169
170    return functor;
171  }
172} // namespace xios
Note: See TracBrowser for help on using the repository browser.