source: XIOS/dev/dev_trunk_omp/src/filter/source_filter.cpp @ 1681

Last change on this file since 1681 was 1681, checked in by yushan, 2 years ago

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1676. Bug fixed

File size: 6.2 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
11                               bool compression /*= true*/, bool mask /*= false*/,
12                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
13                               bool hasMissingValue /*= false*/,
14                               double defaultValue /*= 0.0*/)
15    : COutputPin(gc, manualTrigger)
16    , grid(grid)
17    , compression(compression)
18    , mask(mask)
19    , offset(offset)
20    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
21  {
22    if (!grid)
23      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
24            "Impossible to construct a source filter without providing a grid.");
25  }
26
27  void CSourceFilter::buildGraph(CDataPacketPtr packet)
28  {
29    bool building_graph = this->tag ? packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end : false;
30    // bool building_graph = this->tag ? packet->timestamp >= this->start_graph && packet->timestamp <= this->end_graph : false;
31   
32    // std::cout<<"************************************************source filter 1 : field_id = "<<this->field->getId()<<" this->start_graph = "<<this->start_graph<<" this->end_graph = "<<this->end_graph<<std::endl;
33    // std::cout<<"************************************************source filter 2 : field_id = "<<this->field->getId()<<" this->field->field_graph_start = "<<this->field->field_graph_start<<" this->field->field_graph_end = "<<this->field->field_graph_end<<std::endl;
34
35    if(building_graph)
36    {
37      this->filterID = InvalidableObject::filterIdGenerator++; 
38      packet->src_filterID=this->filterID;
39      packet->field = this->field;
40      packet->distance = 1;
41     
42      // if(CXios::isClient) std::cout<<"source filter tag = "<<this->tag<<" start = "<<start_graph<<" end = "<<end_graph<<std::endl;
43   
44      CWorkflowGraph::allocNodeEdge();
45
46      CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet);
47      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
48      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId();
49      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1;
50
51      CWorkflowGraph::build_begin = true;
52    }
53
54    // if(building_graph) std::cout<<packet->timestamp<<"************************************************ source filter : field_id = "<<this->field->getId()<<" start = "<<this->field->field_graph_start<<" end_graph = "<<this->field->field_graph_end<<std::endl;
55  }
56
57
58  template <int N>
59  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
60  {
61    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
62
63    CDataPacketPtr packet(new CDataPacket);
64    packet->date = date;
65    packet->timestamp = date;
66    packet->status = CDataPacket::NO_ERROR;
67
68    packet->data.resize(grid->storeIndex_client.numElements());   
69   
70    if (compression)
71    {
72      packet->data = defaultValue;
73      grid->uncompressField(data, packet->data);   
74    }
75    else
76    {
77      if (mask)
78        grid->maskField(data, packet->data);
79      else
80        grid->inputField(data, packet->data);
81    }
82    // Convert missing values to NaN
83    if (hasMissingValue)
84    {
85      const double nanValue = std::numeric_limits<double>::quiet_NaN();
86      const size_t nbData = packet->data.numElements();
87      for (size_t idx = 0; idx < nbData; ++idx)
88      {
89        if (defaultValue == packet->data(idx))
90          packet->data(idx) = nanValue;
91      }
92    }
93
94    if(CXios::isClient) buildGraph(packet);
95   
96
97
98    onOutputReady(packet);
99  }
100
101  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
102  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
103  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
104  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
105  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
106  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
107  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
108
109
110
111  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
112  {
113    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
114
115    CDataPacketPtr packet(new CDataPacket);
116    packet->date = date;
117    packet->timestamp = date;
118    packet->status = CDataPacket::NO_ERROR;
119   
120    if (data.size() != grid->storeIndex_fromSrv.size())
121      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
122            << "Incoherent data received from servers,"
123            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
124
125    packet->data.resize(grid->storeIndex_client.numElements());
126    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
127    for (it = data.begin(); it != itEnd; it++)
128    {     
129      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
130      for (int n = 0; n < index.numElements(); n++)
131        packet->data(index(n)) = it->second(n);
132    }
133
134    // Convert missing values to NaN
135    if (hasMissingValue)
136    {
137      const double nanValue = std::numeric_limits<double>::quiet_NaN();
138      const size_t nbData = packet->data.numElements();
139      for (size_t idx = 0; idx < nbData; ++idx)
140      {
141        if (defaultValue == packet->data(idx))
142          packet->data(idx) = nanValue;
143      }
144    }
145
146    onOutputReady(packet);
147  }
148
149  void CSourceFilter::signalEndOfStream(CDate date)
150  {
151    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
152
153    CDataPacketPtr packet(new CDataPacket);
154    packet->date = date;
155    packet->timestamp = date;
156    packet->status = CDataPacket::END_OF_STREAM;
157    onOutputReady(packet);
158  }
159
160 
161} // namespace xios
Note: See TracBrowser for help on using the repository browser.