source: XIOS/dev/dev_oa/src/filter/source_filter.cpp @ 2034

Last change on this file since 2034 was 2034, checked in by oabramkina, 3 years ago

Adding a possibility of tiled and non-tiled sent on the same domain.

File size: 7.1 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/)
15    : COutputPin(gc, manualTrigger)
16    , grid(grid)
17    , compression(compression)
18    , mask(mask)
19    , offset(offset)
20    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
21    , ntiles(0)
22    , storedTileData()
23  {
24    if (!grid)
25      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
26            "Impossible to construct a source filter without providing a grid.");
27  }
28
29  void CSourceFilter::buildGraph(CDataPacketPtr packet)
30  {
31    bool filter_interval=false;
32    if (this->field)
33    {
34      if(this->field->field_graph_start == -1 && this->field->field_graph_end == -1) filter_interval = true;
35      else filter_interval = packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end;
36    }
37    bool building_graph = this->tag ? filter_interval : false;
38    if(building_graph)
39    {
40      this->filterID = InvalidableObject::filterIdGenerator++; 
41      packet->src_filterID=this->filterID;
42      packet->field = this->field;
43      packet->distance = 1;
44     
45      CWorkflowGraph::allocNodeEdge();
46     
47      CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet);
48      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
49      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId();
50      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1;
51
52      CWorkflowGraph::build_begin = true;
53    }
54
55  }
56
57  template <int N>
58  void CSourceFilter::streamTile(CDate date, const CArray<double, N>& tileData, int tileId)
59  {
60    if (ntiles==0)
61    {
62      const double nanValue = std::numeric_limits<double>::quiet_NaN();
63//      storedTileData.resize(grid->storeIndex_client.numElements());
64      storedTileData.resize(grid->getDataSize());
65      storedTileData = nanValue;
66    }
67    grid->copyTile(tileData, storedTileData, tileId);
68    ++ntiles;
69    if (ntiles==grid->getNTiles())
70    {
71      // Data entering workflow will be exactly of size ni*nj for a grid 2d or ni*nj*n for a grid 3d
72      streamData(date, storedTileData, true);
73      ntiles = 0;
74    }
75  }
76
77  template <int N>
78  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data, bool isTiled)
79  {
80    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
81
82    CDataPacketPtr packet(new CDataPacket);
83    packet->date = date;
84    packet->timestamp = date;
85    packet->status = CDataPacket::NO_ERROR;
86
87    packet->data.resize(grid->storeIndex_client.numElements());   
88   
89    if (compression)
90    {
91      packet->data = defaultValue;
92      grid->uncompressField(data, packet->data);   
93    }
94    else
95    {
96      if (mask)
97        if (isTiled)
98          grid->maskField(data, packet->data, isTiled);
99        else
100          grid->maskField(data, packet->data);
101      else
102        grid->inputField(data, packet->data);
103    }
104    // Convert missing values to NaN
105    if (hasMissingValue)
106    {
107      const double nanValue = std::numeric_limits<double>::quiet_NaN();
108      const size_t nbData = packet->data.numElements();
109      for (size_t idx = 0; idx < nbData; ++idx)
110      {
111        if (defaultValue == packet->data(idx))
112          packet->data(idx) = nanValue;
113      }
114    }
115
116    if(CXios::isClient) buildGraph(packet);
117   
118
119
120    onOutputReady(packet);
121  }
122
123  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data, bool isTiled);
124  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data, bool isTiled);
125  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data, bool isTiled);
126  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data, bool isTiled);
127  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data, bool isTiled);
128  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data, bool isTiled);
129  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data, bool isTiled);
130
131  template void CSourceFilter::streamTile<1>(CDate date, const CArray<double, 1>& data, int ntile);
132  template void CSourceFilter::streamTile<2>(CDate date, const CArray<double, 2>& data, int ntile);
133  template void CSourceFilter::streamTile<3>(CDate date, const CArray<double, 3>& data, int ntile);
134  template void CSourceFilter::streamTile<4>(CDate date, const CArray<double, 4>& data, int ntile);
135  template void CSourceFilter::streamTile<5>(CDate date, const CArray<double, 5>& data, int ntile);
136  template void CSourceFilter::streamTile<6>(CDate date, const CArray<double, 6>& data, int ntile);
137  template void CSourceFilter::streamTile<7>(CDate date, const CArray<double, 7>& data, int ntile);
138
139  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
140  {
141    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
142
143    CDataPacketPtr packet(new CDataPacket);
144    packet->date = date;
145    packet->timestamp = date;
146    packet->status = CDataPacket::NO_ERROR;
147   
148    if (data.size() != grid->storeIndex_fromSrv.size())
149      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
150            << "Incoherent data received from servers,"
151            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
152
153    packet->data.resize(grid->storeIndex_client.numElements());
154    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
155    for (it = data.begin(); it != itEnd; it++)
156    {     
157      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
158      for (int n = 0; n < index.numElements(); n++)
159        packet->data(index(n)) = it->second(n);
160    }
161
162    // Convert missing values to NaN
163    if (hasMissingValue)
164    {
165      const double nanValue = std::numeric_limits<double>::quiet_NaN();
166      const size_t nbData = packet->data.numElements();
167      for (size_t idx = 0; idx < nbData; ++idx)
168      {
169        if (defaultValue == packet->data(idx))
170          packet->data(idx) = nanValue;
171      }
172    }
173    if(CXios::isClient) buildGraph(packet);
174    onOutputReady(packet);
175  }
176
177  void CSourceFilter::signalEndOfStream(CDate date)
178  {
179    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
180
181    CDataPacketPtr packet(new CDataPacket);
182    packet->date = date;
183    packet->timestamp = date;
184    packet->status = CDataPacket::END_OF_STREAM; 
185    onOutputReady(packet);
186  }
187} // namespace xios
Note: See TracBrowser for help on using the repository browser.