source: XIOS/dev/XIOS_DEV_CMIP6/src/filter/source_filter.cpp @ 1250

Last change on this file since 1250 was 1250, checked in by mhnguyen, 7 years ago

Fixing bug on mask grid

+) Add mask_0d for scalar grid
+) Transmit grid's attributes (mask) from client and reconstruct them correctly on server
+) Rebuild data in the input of data flow on the server side

Test
+) On Curie
+) Simple test

File size: 4.5 KB
RevLine 
[638]1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
[756]4#include "calendar_util.hpp"
[1158]5#include <limits>
[638]6
7namespace xios
8{
[1241]9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, bool compression, 
[1158]10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
[1021]13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
[1241]15    , compression(compression)
[756]16    , offset(offset)
[1158]17    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
[638]18  {
19    if (!grid)
20      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
21            "Impossible to construct a source filter without providing a grid.");
22  }
23
24  template <int N>
[643]25  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
[638]26  {
[756]27    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
28
[638]29    CDataPacketPtr packet(new CDataPacket);
[643]30    packet->date = date;
31    packet->timestamp = date;
[638]32    packet->status = CDataPacket::NO_ERROR;
33
[1250]34    packet->data.resize(grid->storeIndex_client.numElements());   
35   
36    if (compression)
37    {
38      packet->data = defaultValue;
39      grid->uncompressField(data, packet->data);   
40    }
41    else
42      grid->inputField(data, packet->data);
43
44   
45   
[1249]46    // if (compression) grid->inputField(data, packet->data) ;
47    // else
48    // {
49    //   // just make a flat copy
50    //   CArray<double, N> data_tmp(data.copy()) ; // supress const attribute
51    //   CArray<double,1> dataTmp2(data_tmp.dataFirst(),shape(data.numElements()),neverDeleteData) ;
52    //   packet->data = dataTmp2 ;
53    // }
[1158]54    // Convert missing values to NaN
55    if (hasMissingValue)
56    {
[1201]57      const double nanValue = std::numeric_limits<double>::quiet_NaN();
58      const size_t nbData = packet->data.numElements();
[1158]59      for (size_t idx = 0; idx < nbData; ++idx)
60      {
61        if (defaultValue == packet->data(idx))
62          packet->data(idx) = nanValue;
63      }
64    }
65
[1021]66    onOutputReady(packet);
[638]67  }
68
[643]69  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
70  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
71  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
[932]72  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
73  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
74  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
75  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
[638]76
[643]77  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
[638]78  {
[756]79    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
80
[638]81    CDataPacketPtr packet(new CDataPacket);
[643]82    packet->date = date;
83    packet->timestamp = date;
[638]84    packet->status = CDataPacket::NO_ERROR;
[1249]85   
[1021]86    if (data.size() != grid->storeIndex_fromSrv.size())
[643]87      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
[638]88            << "Incoherent data received from servers,"
[1021]89            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
[638]90
91    packet->data.resize(grid->storeIndex_client.numElements());
92    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
93    for (it = data.begin(); it != itEnd; it++)
[1249]94    {     
[1021]95      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
[638]96      for (int n = 0; n < index.numElements(); n++)
97        packet->data(index(n)) = it->second(n);
98    }
99
[1201]100    // Convert missing values to NaN
101    if (hasMissingValue)
102    {
103      const double nanValue = std::numeric_limits<double>::quiet_NaN();
104      const size_t nbData = packet->data.numElements();
105      for (size_t idx = 0; idx < nbData; ++idx)
106      {
107        if (defaultValue == packet->data(idx))
108          packet->data(idx) = nanValue;
109      }
110    }
111
[1021]112    onOutputReady(packet);
[638]113  }
114
[643]115  void CSourceFilter::signalEndOfStream(CDate date)
[638]116  {
[1210]117    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
118
[638]119    CDataPacketPtr packet(new CDataPacket);
[643]120    packet->date = date;
121    packet->timestamp = date;
[638]122    packet->status = CDataPacket::END_OF_STREAM;
[1021]123    onOutputReady(packet);
[638]124  }
125} // namespace xios
Note: See TracBrowser for help on using the repository browser.