source: XIOS/dev/branch_openmp/src/filter/source_filter.cpp @ 1328

Last change on this file since 1328 was 1328, checked in by yushan, 6 years ago

dev_omp

File size: 4.0 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6
7namespace xios
8{
9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
15    , offset(offset)
16    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
17  {
18    if (!grid)
19      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
20            "Impossible to construct a source filter without providing a grid.");
21  }
22
23  template <int N>
24  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
25  {
26    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
27
28    CDataPacketPtr packet(new CDataPacket);
29    packet->date = date;
30    packet->timestamp = date;
31    packet->status = CDataPacket::NO_ERROR;
32
33    packet->data.resize(grid->storeIndex_client.numElements());
34    grid->inputField(data, packet->data);
35
36    // Convert missing values to NaN
37    if (hasMissingValue)
38    {
39      const double nanValue = std::numeric_limits<double>::quiet_NaN();
40      const size_t nbData = packet->data.numElements();
41      for (size_t idx = 0; idx < nbData; ++idx)
42      {
43        if (defaultValue == packet->data(idx))
44          packet->data(idx) = nanValue;
45      }
46    }
47
48    onOutputReady(packet);
49  }
50
51  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
52  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
53  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
54  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
55  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
56  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
57  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
58
59  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
60  {
61    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
62
63    CDataPacketPtr packet(new CDataPacket);
64    packet->date = date;
65    packet->timestamp = date;
66    packet->status = CDataPacket::NO_ERROR;
67
68    // if (data.size() != grid->storeIndex_toSrv.size())
69    if (data.size() != grid->storeIndex_fromSrv.size())
70      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
71            << "Incoherent data received from servers,"
72            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
73
74    packet->data.resize(grid->storeIndex_client.numElements());
75    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
76    for (it = data.begin(); it != itEnd; it++)
77    {
78      // CArray<int,1>& index = grid->storeIndex_toSrv[it->first];
79      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
80      for (int n = 0; n < index.numElements(); n++)
81        packet->data(index(n)) = it->second(n);
82    }
83
84    // Convert missing values to NaN
85    if (hasMissingValue)
86    {
87      const double nanValue = std::numeric_limits<double>::quiet_NaN();
88      const size_t nbData = packet->data.numElements();
89      for (size_t idx = 0; idx < nbData; ++idx)
90      {
91        if (defaultValue == packet->data(idx))
92          packet->data(idx) = nanValue;
93      }
94    }
95
96    onOutputReady(packet);
97  }
98
99  void CSourceFilter::signalEndOfStream(CDate date)
100  {
101    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
102
103    CDataPacketPtr packet(new CDataPacket);
104    packet->date = date;
105    packet->timestamp = date;
106    packet->status = CDataPacket::END_OF_STREAM;
107    onOutputReady(packet);
108  }
109} // namespace xios
Note: See TracBrowser for help on using the repository browser.