source: XIOS/dev/XIOS_DEV_CMIP6/src/filter/source_filter.cpp @ 1249

Last change on this file since 1249 was 1249, checked in by mhnguyen, 7 years ago

Various bug fixes on mask and zoom

+) Rearrange local index on the receiving side to be coherent global index
+) Include masking information in compress data (data_index) on the receiving side
+) Correct zoom to work in case there are several (not all) processes participating to write data

Test
+) On Curie
+) Simple test

File size: 4.3 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6
7namespace xios
8{
9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, bool compression, 
10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
15    , compression(compression)
16    , offset(offset)
17    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
18  {
19    if (!grid)
20      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
21            "Impossible to construct a source filter without providing a grid.");
22  }
23
24  template <int N>
25  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
26  {
27    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
28
29    CDataPacketPtr packet(new CDataPacket);
30    packet->date = date;
31    packet->timestamp = date;
32    packet->status = CDataPacket::NO_ERROR;
33
34    packet->data.resize(grid->storeIndex_client.numElements());
35    grid->inputField(data, packet->data) ;
36    // if (compression) grid->inputField(data, packet->data) ;
37    // else
38    // {
39    //   // just make a flat copy
40    //   CArray<double, N> data_tmp(data.copy()) ; // supress const attribute
41    //   CArray<double,1> dataTmp2(data_tmp.dataFirst(),shape(data.numElements()),neverDeleteData) ;
42    //   packet->data = dataTmp2 ;
43    // }
44    // Convert missing values to NaN
45    if (hasMissingValue)
46    {
47      const double nanValue = std::numeric_limits<double>::quiet_NaN();
48      const size_t nbData = packet->data.numElements();
49      for (size_t idx = 0; idx < nbData; ++idx)
50      {
51        if (defaultValue == packet->data(idx))
52          packet->data(idx) = nanValue;
53      }
54    }
55
56    onOutputReady(packet);
57  }
58
59  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
60  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
61  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
62  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
63  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
64  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
65  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
66
67  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
68  {
69    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
70
71    CDataPacketPtr packet(new CDataPacket);
72    packet->date = date;
73    packet->timestamp = date;
74    packet->status = CDataPacket::NO_ERROR;
75   
76    if (data.size() != grid->storeIndex_fromSrv.size())
77      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
78            << "Incoherent data received from servers,"
79            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
80
81    packet->data.resize(grid->storeIndex_client.numElements());
82    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
83    for (it = data.begin(); it != itEnd; it++)
84    {     
85      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
86      for (int n = 0; n < index.numElements(); n++)
87        packet->data(index(n)) = it->second(n);
88    }
89
90    // Convert missing values to NaN
91    if (hasMissingValue)
92    {
93      const double nanValue = std::numeric_limits<double>::quiet_NaN();
94      const size_t nbData = packet->data.numElements();
95      for (size_t idx = 0; idx < nbData; ++idx)
96      {
97        if (defaultValue == packet->data(idx))
98          packet->data(idx) = nanValue;
99      }
100    }
101
102    onOutputReady(packet);
103  }
104
105  void CSourceFilter::signalEndOfStream(CDate date)
106  {
107    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
108
109    CDataPacketPtr packet(new CDataPacket);
110    packet->date = date;
111    packet->timestamp = date;
112    packet->status = CDataPacket::END_OF_STREAM;
113    onOutputReady(packet);
114  }
115} // namespace xios
Note: See TracBrowser for help on using the repository browser.