source: XIOS/dev/XIOS_DEV_CMIP6/src/filter/source_filter.cpp @ 1250

Last change on this file since 1250 was 1250, checked in by mhnguyen, 7 years ago

Fixing bug on mask grid

+) Add mask_0d for scalar grid
+) Transmit grid's attributes (mask) from client and reconstruct them correctly on server
+) Rebuild data in the input of data flow on the server side

Test
+) On Curie
+) Simple test

File size: 4.5 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6
7namespace xios
8{
9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, bool compression, 
10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
15    , compression(compression)
16    , offset(offset)
17    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
18  {
19    if (!grid)
20      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
21            "Impossible to construct a source filter without providing a grid.");
22  }
23
24  template <int N>
25  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
26  {
27    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
28
29    CDataPacketPtr packet(new CDataPacket);
30    packet->date = date;
31    packet->timestamp = date;
32    packet->status = CDataPacket::NO_ERROR;
33
34    packet->data.resize(grid->storeIndex_client.numElements());   
35   
36    if (compression)
37    {
38      packet->data = defaultValue;
39      grid->uncompressField(data, packet->data);   
40    }
41    else
42      grid->inputField(data, packet->data);
43
44   
45   
46    // if (compression) grid->inputField(data, packet->data) ;
47    // else
48    // {
49    //   // just make a flat copy
50    //   CArray<double, N> data_tmp(data.copy()) ; // supress const attribute
51    //   CArray<double,1> dataTmp2(data_tmp.dataFirst(),shape(data.numElements()),neverDeleteData) ;
52    //   packet->data = dataTmp2 ;
53    // }
54    // Convert missing values to NaN
55    if (hasMissingValue)
56    {
57      const double nanValue = std::numeric_limits<double>::quiet_NaN();
58      const size_t nbData = packet->data.numElements();
59      for (size_t idx = 0; idx < nbData; ++idx)
60      {
61        if (defaultValue == packet->data(idx))
62          packet->data(idx) = nanValue;
63      }
64    }
65
66    onOutputReady(packet);
67  }
68
69  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
70  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
71  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
72  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
73  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
74  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
75  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
76
77  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
78  {
79    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
80
81    CDataPacketPtr packet(new CDataPacket);
82    packet->date = date;
83    packet->timestamp = date;
84    packet->status = CDataPacket::NO_ERROR;
85   
86    if (data.size() != grid->storeIndex_fromSrv.size())
87      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
88            << "Incoherent data received from servers,"
89            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
90
91    packet->data.resize(grid->storeIndex_client.numElements());
92    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
93    for (it = data.begin(); it != itEnd; it++)
94    {     
95      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
96      for (int n = 0; n < index.numElements(); n++)
97        packet->data(index(n)) = it->second(n);
98    }
99
100    // Convert missing values to NaN
101    if (hasMissingValue)
102    {
103      const double nanValue = std::numeric_limits<double>::quiet_NaN();
104      const size_t nbData = packet->data.numElements();
105      for (size_t idx = 0; idx < nbData; ++idx)
106      {
107        if (defaultValue == packet->data(idx))
108          packet->data(idx) = nanValue;
109      }
110    }
111
112    onOutputReady(packet);
113  }
114
115  void CSourceFilter::signalEndOfStream(CDate date)
116  {
117    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
118
119    CDataPacketPtr packet(new CDataPacket);
120    packet->date = date;
121    packet->timestamp = date;
122    packet->status = CDataPacket::END_OF_STREAM;
123    onOutputReady(packet);
124  }
125} // namespace xios
Note: See TracBrowser for help on using the repository browser.