source: XIOS/trunk/src/filter/source_filter.cpp @ 1928

Last change on this file since 1928 was 1928, checked in by yushan, 4 years ago

trunk : workflow graph now working with field read from file

File size: 5.5 KB
RevLine 
[638]1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
[756]4#include "calendar_util.hpp"
[1158]5#include <limits>
[1704]6#include "workflow_graph.hpp"
[638]7
8namespace xios
9{
[1637]10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
[1158]12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/)
[1021]15    : COutputPin(gc, manualTrigger)
16    , grid(grid)
[1241]17    , compression(compression)
[1637]18    , mask(mask)
[756]19    , offset(offset)
[1158]20    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
[638]21  {
22    if (!grid)
23      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
24            "Impossible to construct a source filter without providing a grid.");
25  }
26
[1704]27  void CSourceFilter::buildGraph(CDataPacketPtr packet)
28  {
[1876]29    bool filter_interval=false;
30    if (this->field)
31    {
32      if(this->field->field_graph_start == -1 && this->field->field_graph_end == -1) filter_interval = true;
33      else filter_interval = packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end;
34    }
35    bool building_graph = this->tag ? filter_interval : false;
[1704]36    if(building_graph)
37    {
38      this->filterID = InvalidableObject::filterIdGenerator++; 
39      packet->src_filterID=this->filterID;
40      packet->field = this->field;
41      packet->distance = 1;
42     
43      CWorkflowGraph::allocNodeEdge();
[1877]44     
[1704]45      CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet);
46      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
47      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId();
48      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1;
49
50      CWorkflowGraph::build_begin = true;
51    }
52
53  }
54
55
[638]56  template <int N>
[643]57  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
[638]58  {
[756]59    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
60
[638]61    CDataPacketPtr packet(new CDataPacket);
[643]62    packet->date = date;
63    packet->timestamp = date;
[638]64    packet->status = CDataPacket::NO_ERROR;
65
[1250]66    packet->data.resize(grid->storeIndex_client.numElements());   
67   
68    if (compression)
69    {
70      packet->data = defaultValue;
71      grid->uncompressField(data, packet->data);   
72    }
73    else
[1637]74    {
75      if (mask)
76        grid->maskField(data, packet->data);
77      else
78        grid->inputField(data, packet->data);
79    }
[1158]80    // Convert missing values to NaN
81    if (hasMissingValue)
82    {
[1201]83      const double nanValue = std::numeric_limits<double>::quiet_NaN();
84      const size_t nbData = packet->data.numElements();
[1158]85      for (size_t idx = 0; idx < nbData; ++idx)
86      {
87        if (defaultValue == packet->data(idx))
88          packet->data(idx) = nanValue;
89      }
90    }
91
[1704]92    if(CXios::isClient) buildGraph(packet);
93   
94
95
[1021]96    onOutputReady(packet);
[638]97  }
98
[643]99  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
100  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
101  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
[932]102  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
103  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
104  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
105  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
[638]106
[643]107  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
[638]108  {
[756]109    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
110
[638]111    CDataPacketPtr packet(new CDataPacket);
[643]112    packet->date = date;
113    packet->timestamp = date;
[638]114    packet->status = CDataPacket::NO_ERROR;
[1249]115   
[1021]116    if (data.size() != grid->storeIndex_fromSrv.size())
[643]117      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
[638]118            << "Incoherent data received from servers,"
[1021]119            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
[638]120
121    packet->data.resize(grid->storeIndex_client.numElements());
122    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
123    for (it = data.begin(); it != itEnd; it++)
[1249]124    {     
[1021]125      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
[638]126      for (int n = 0; n < index.numElements(); n++)
127        packet->data(index(n)) = it->second(n);
128    }
129
[1201]130    // Convert missing values to NaN
131    if (hasMissingValue)
132    {
133      const double nanValue = std::numeric_limits<double>::quiet_NaN();
134      const size_t nbData = packet->data.numElements();
135      for (size_t idx = 0; idx < nbData; ++idx)
136      {
137        if (defaultValue == packet->data(idx))
138          packet->data(idx) = nanValue;
139      }
140    }
[1928]141    if(CXios::isClient) buildGraph(packet);
[1021]142    onOutputReady(packet);
[638]143  }
144
[643]145  void CSourceFilter::signalEndOfStream(CDate date)
[638]146  {
[1210]147    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
148
[638]149    CDataPacketPtr packet(new CDataPacket);
[643]150    packet->date = date;
151    packet->timestamp = date;
[1928]152    packet->status = CDataPacket::END_OF_STREAM; 
[1021]153    onOutputReady(packet);
[638]154  }
155} // namespace xios
Note: See TracBrowser for help on using the repository browser.