source: XIOS/dev/branch_openmp/src/filter/source_filter.cpp @ 1328

Last change on this file since 1328 was 1328, checked in by yushan, 6 years ago

dev_omp

File size: 4.0 KB
RevLine 
[638]1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
[756]4#include "calendar_util.hpp"
[1018]5#include <limits>
[638]6
7namespace xios
8{
[1006]9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
[1018]10                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
11                               bool hasMissingValue /*= false*/,
12                               double defaultValue /*= 0.0*/)
[1006]13    : COutputPin(gc, manualTrigger)
14    , grid(grid)
[756]15    , offset(offset)
[1018]16    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
[638]17  {
18    if (!grid)
19      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
20            "Impossible to construct a source filter without providing a grid.");
21  }
22
23  template <int N>
[643]24  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
[638]25  {
[756]26    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
27
[638]28    CDataPacketPtr packet(new CDataPacket);
[643]29    packet->date = date;
30    packet->timestamp = date;
[638]31    packet->status = CDataPacket::NO_ERROR;
32
33    packet->data.resize(grid->storeIndex_client.numElements());
34    grid->inputField(data, packet->data);
35
[1018]36    // Convert missing values to NaN
37    if (hasMissingValue)
38    {
[1205]39      const double nanValue = std::numeric_limits<double>::quiet_NaN();
40      const size_t nbData = packet->data.numElements();
[1018]41      for (size_t idx = 0; idx < nbData; ++idx)
42      {
43        if (defaultValue == packet->data(idx))
44          packet->data(idx) = nanValue;
45      }
46    }
47
[1006]48    onOutputReady(packet);
[638]49  }
50
[643]51  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
52  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
53  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
[932]54  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
55  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
56  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
57  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
[638]58
[643]59  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
[638]60  {
[756]61    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
62
[638]63    CDataPacketPtr packet(new CDataPacket);
[643]64    packet->date = date;
65    packet->timestamp = date;
[638]66    packet->status = CDataPacket::NO_ERROR;
67
[988]68    // if (data.size() != grid->storeIndex_toSrv.size())
69    if (data.size() != grid->storeIndex_fromSrv.size())
[643]70      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
[638]71            << "Incoherent data received from servers,"
[988]72            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
[638]73
74    packet->data.resize(grid->storeIndex_client.numElements());
75    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
76    for (it = data.begin(); it != itEnd; it++)
77    {
[988]78      // CArray<int,1>& index = grid->storeIndex_toSrv[it->first];
79      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
[638]80      for (int n = 0; n < index.numElements(); n++)
81        packet->data(index(n)) = it->second(n);
82    }
83
[1205]84    // Convert missing values to NaN
85    if (hasMissingValue)
86    {
87      const double nanValue = std::numeric_limits<double>::quiet_NaN();
88      const size_t nbData = packet->data.numElements();
89      for (size_t idx = 0; idx < nbData; ++idx)
90      {
91        if (defaultValue == packet->data(idx))
92          packet->data(idx) = nanValue;
93      }
94    }
95
[1006]96    onOutputReady(packet);
[638]97  }
98
[643]99  void CSourceFilter::signalEndOfStream(CDate date)
[638]100  {
[1328]101    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
102
[638]103    CDataPacketPtr packet(new CDataPacket);
[643]104    packet->date = date;
105    packet->timestamp = date;
[638]106    packet->status = CDataPacket::END_OF_STREAM;
[1006]107    onOutputReady(packet);
[638]108  }
109} // namespace xios
Note: See TracBrowser for help on using the repository browser.