source: XIOS/dev/dev_trunk_omp/src/filter/source_filter.cpp @ 1670

Last change on this file since 1670 was 1668, checked in by yushan, 5 years ago

MARK: branch merged with trunk @1663. static graph OK with EP

File size: 4.4 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/,
15                               bool buildWorkflowGraph /*= false*/)
16    : COutputPin(gc, manualTrigger, buildWorkflowGraph)
17    , grid(grid)
18    , compression(compression)
19    , mask(mask)
20    , offset(offset)
21    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
22  {
23    if (!grid)
24      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
25            "Impossible to construct a source filter without providing a grid.");
26  }
27
28  template <int N>
29  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
30  {
31    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
32
33    CDataPacketPtr packet(new CDataPacket);
34    packet->date = date;
35    packet->timestamp = date;
36    packet->status = CDataPacket::NO_ERROR;
37
38    packet->data.resize(grid->storeIndex_client.numElements());   
39   
40    if (compression)
41    {
42      packet->data = defaultValue;
43      grid->uncompressField(data, packet->data);   
44    }
45    else
46    {
47      if (mask)
48        grid->maskField(data, packet->data);
49      else
50        grid->inputField(data, packet->data);
51    }
52    // Convert missing values to NaN
53    if (hasMissingValue)
54    {
55      const double nanValue = std::numeric_limits<double>::quiet_NaN();
56      const size_t nbData = packet->data.numElements();
57      for (size_t idx = 0; idx < nbData; ++idx)
58      {
59        if (defaultValue == packet->data(idx))
60          packet->data(idx) = nanValue;
61      }
62    }
63
64    onOutputReady(packet);
65  }
66
67  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
68  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
69  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
70  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
71  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
72  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
73  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
74
75  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
76  {
77    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
78
79    CDataPacketPtr packet(new CDataPacket);
80    packet->date = date;
81    packet->timestamp = date;
82    packet->status = CDataPacket::NO_ERROR;
83   
84    if (data.size() != grid->storeIndex_fromSrv.size())
85      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
86            << "Incoherent data received from servers,"
87            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
88
89    packet->data.resize(grid->storeIndex_client.numElements());
90    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
91    for (it = data.begin(); it != itEnd; it++)
92    {     
93      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
94      for (int n = 0; n < index.numElements(); n++)
95        packet->data(index(n)) = it->second(n);
96    }
97
98    // Convert missing values to NaN
99    if (hasMissingValue)
100    {
101      const double nanValue = std::numeric_limits<double>::quiet_NaN();
102      const size_t nbData = packet->data.numElements();
103      for (size_t idx = 0; idx < nbData; ++idx)
104      {
105        if (defaultValue == packet->data(idx))
106          packet->data(idx) = nanValue;
107      }
108    }
109
110    onOutputReady(packet);
111  }
112
113  void CSourceFilter::signalEndOfStream(CDate date)
114  {
115    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
116
117    CDataPacketPtr packet(new CDataPacket);
118    packet->date = date;
119    packet->timestamp = date;
120    packet->status = CDataPacket::END_OF_STREAM;
121    onOutputReady(packet);
122  }
123} // namespace xios
Note: See TracBrowser for help on using the repository browser.