source: XIOS/dev/dev_trunk_omp/src/filter/source_filter.cpp @ 1677

Last change on this file since 1677 was 1677, checked in by yushan, 5 years ago

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1663.

File size: 7.3 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/,
15                               bool buildWorkflowGraph /*= false*/)
16    : COutputPin(gc, manualTrigger, buildWorkflowGraph)
17    , grid(grid)
18    , compression(compression)
19    , mask(mask)
20    , offset(offset)
21    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
22  {
23    if (!grid)
24      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
25            "Impossible to construct a source filter without providing a grid.");
26  }
27
28 
29  template <int N>
30  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data, const StdString field_id)
31  {
32    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
33
34    CDataPacketPtr packet(new CDataPacket);
35    packet->date = date;
36    packet->timestamp = date;
37    packet->status = CDataPacket::NO_ERROR;
38
39    packet->data.resize(grid->storeIndex_client.numElements());   
40   
41    if (compression)
42    {
43      packet->data = defaultValue;
44      grid->uncompressField(data, packet->data);   
45    }
46    else
47    {
48      if (mask)
49        grid->maskField(data, packet->data);
50      else
51        grid->inputField(data, packet->data);
52    }
53    // Convert missing values to NaN
54    if (hasMissingValue)
55    {
56      const double nanValue = std::numeric_limits<double>::quiet_NaN();
57      const size_t nbData = packet->data.numElements();
58      for (size_t idx = 0; idx < nbData; ++idx)
59      {
60        if (defaultValue == packet->data(idx))
61          packet->data(idx) = nanValue;
62      }
63    }
64
65    packet->fieldID = field_id;
66    this->output_field_id = field_id;
67
68
69    if(this->tag)
70    {
71      this->filterID.first = InvalidableObject::filterIdGenerator++; 
72      packet->src_filterID=this->filterID.first;
73     
74     
75      if(CXios::isClient) std::cout<<"source filter tag = "<<this->tag<<std::endl;
76   
77      if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>;
78
79      (*CWorkflowGraph::mapFilters_ptr)[this->filterID.first] = "Source Filter";
80
81      if(CWorkflowGraph::mapFieldToFilters_ptr==0) CWorkflowGraph::mapFieldToFilters_ptr = new std::unordered_map <StdString, vector <int> >;
82
83
84    }
85
86
87    onOutputReady(packet);
88  }
89
90  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data, const StdString field_id);
91  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data, const StdString field_id);
92  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data, const StdString field_id);
93  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data, const StdString field_id);
94  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data, const StdString field_id);
95  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data, const StdString field_id);
96  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data, const StdString field_id);
97
98
99  template <int N>
100  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
101  {
102    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
103
104    CDataPacketPtr packet(new CDataPacket);
105    packet->date = date;
106    packet->timestamp = date;
107    packet->status = CDataPacket::NO_ERROR;
108
109    packet->data.resize(grid->storeIndex_client.numElements());   
110   
111    if (compression)
112    {
113      packet->data = defaultValue;
114      grid->uncompressField(data, packet->data);   
115    }
116    else
117    {
118      if (mask)
119        grid->maskField(data, packet->data);
120      else
121        grid->inputField(data, packet->data);
122    }
123    // Convert missing values to NaN
124    if (hasMissingValue)
125    {
126      const double nanValue = std::numeric_limits<double>::quiet_NaN();
127      const size_t nbData = packet->data.numElements();
128      for (size_t idx = 0; idx < nbData; ++idx)
129      {
130        if (defaultValue == packet->data(idx))
131          packet->data(idx) = nanValue;
132      }
133    }
134    this->filterID.first = InvalidableObject::filterIdGenerator++;
135    packet->src_filterID=this->filterID.first;
136
137    if(CXios::isClient) std::cout<<"source filter filter tag = "<<this->tag<<std::endl;
138   
139    if(CWorkflowGraph::mapFilters_ptr==0) CWorkflowGraph::mapFilters_ptr = new std::unordered_map <int, StdString>;
140
141    (*CWorkflowGraph::mapFilters_ptr)[this->filterID.first] = "Source Filter";
142
143
144
145    onOutputReady(packet);
146  }
147
148  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
149  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
150  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
151  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
152  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
153  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
154  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
155
156
157
158  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
159  {
160    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
161
162    CDataPacketPtr packet(new CDataPacket);
163    packet->date = date;
164    packet->timestamp = date;
165    packet->status = CDataPacket::NO_ERROR;
166   
167    if (data.size() != grid->storeIndex_fromSrv.size())
168      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
169            << "Incoherent data received from servers,"
170            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
171
172    packet->data.resize(grid->storeIndex_client.numElements());
173    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
174    for (it = data.begin(); it != itEnd; it++)
175    {     
176      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
177      for (int n = 0; n < index.numElements(); n++)
178        packet->data(index(n)) = it->second(n);
179    }
180
181    // Convert missing values to NaN
182    if (hasMissingValue)
183    {
184      const double nanValue = std::numeric_limits<double>::quiet_NaN();
185      const size_t nbData = packet->data.numElements();
186      for (size_t idx = 0; idx < nbData; ++idx)
187      {
188        if (defaultValue == packet->data(idx))
189          packet->data(idx) = nanValue;
190      }
191    }
192
193    onOutputReady(packet);
194  }
195
196  void CSourceFilter::signalEndOfStream(CDate date)
197  {
198    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
199
200    CDataPacketPtr packet(new CDataPacket);
201    packet->date = date;
202    packet->timestamp = date;
203    packet->status = CDataPacket::END_OF_STREAM;
204    onOutputReady(packet);
205  }
206
207 
208} // namespace xios
Note: See TracBrowser for help on using the repository browser.