source: XIOS/dev/XIOS_DEV_CMIP6/src/filter/source_filter.cpp @ 1241

Last change on this file since 1241 was 1241, checked in by ymipsl, 7 years ago

Fix problem on incoming data from client to server when you have so masking.
The source filter on server side try to compress data but the data are already compressed by client. So just make a flat copy and enter into workflow.

Now data compression is not called on server side.

YM

File size: 4.3 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6
7namespace xios
8{
9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, bool compression, 
10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
15    , compression(compression)
16    , offset(offset)
17    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
18  {
19    if (!grid)
20      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
21            "Impossible to construct a source filter without providing a grid.");
22  }
23
24  template <int N>
25  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
26  {
27    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
28
29    CDataPacketPtr packet(new CDataPacket);
30    packet->date = date;
31    packet->timestamp = date;
32    packet->status = CDataPacket::NO_ERROR;
33
34    packet->data.resize(grid->storeIndex_client.numElements());
35    if (compression) grid->inputField(data, packet->data) ;
36    else
37    {
38      // just make a flat copy
39      CArray<double,1> dataTmp(data.copy().dataFirst(),shape(data.numElements())) ;
40      packet->data.reference(dataTmp) ;
41    }
42    // Convert missing values to NaN
43    if (hasMissingValue)
44    {
45      const double nanValue = std::numeric_limits<double>::quiet_NaN();
46      const size_t nbData = packet->data.numElements();
47      for (size_t idx = 0; idx < nbData; ++idx)
48      {
49        if (defaultValue == packet->data(idx))
50          packet->data(idx) = nanValue;
51      }
52    }
53
54    onOutputReady(packet);
55  }
56
57  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
58  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
59  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
60  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
61  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
62  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
63  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
64
65  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
66  {
67    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
68
69    CDataPacketPtr packet(new CDataPacket);
70    packet->date = date;
71    packet->timestamp = date;
72    packet->status = CDataPacket::NO_ERROR;
73
74    // if (data.size() != grid->storeIndex_toSrv.size())
75    if (data.size() != grid->storeIndex_fromSrv.size())
76      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
77            << "Incoherent data received from servers,"
78            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
79
80    packet->data.resize(grid->storeIndex_client.numElements());
81    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
82    for (it = data.begin(); it != itEnd; it++)
83    {
84      // CArray<int,1>& index = grid->storeIndex_toSrv[it->first];
85      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
86      for (int n = 0; n < index.numElements(); n++)
87        packet->data(index(n)) = it->second(n);
88    }
89
90    // Convert missing values to NaN
91    if (hasMissingValue)
92    {
93      const double nanValue = std::numeric_limits<double>::quiet_NaN();
94      const size_t nbData = packet->data.numElements();
95      for (size_t idx = 0; idx < nbData; ++idx)
96      {
97        if (defaultValue == packet->data(idx))
98          packet->data(idx) = nanValue;
99      }
100    }
101
102    onOutputReady(packet);
103  }
104
105  void CSourceFilter::signalEndOfStream(CDate date)
106  {
107    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
108
109    CDataPacketPtr packet(new CDataPacket);
110    packet->date = date;
111    packet->timestamp = date;
112    packet->status = CDataPacket::END_OF_STREAM;
113    onOutputReady(packet);
114  }
115} // namespace xios
Note: See TracBrowser for help on using the repository browser.