source: XIOS/dev/dev_trunk_omp/src/filter/binary_arithmetic_filter.cpp @ 1681

Last change on this file since 1681 was 1681, checked in by yushan, 2 years ago

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1676. Bug fixed

File size: 12.0 KB
Line 
1#include "binary_arithmetic_filter.hpp"
2#include "workflow_graph.hpp"
3#include "yacc_var.hpp"
4#include "file.hpp"
5
6
7namespace xios
8{
9  CScalarFieldArithmeticFilter::CScalarFieldArithmeticFilter(CGarbageCollector& gc, const std::string& op, double value)
10    : CFilter(gc, 1, this)
11    , op(operatorExpr.getOpScalarField(op))
12    , value(value)
13  { 
14    expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1);
15  };
16
17  std::tuple<int, int, int> CScalarFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data)
18  {
19    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
20    // bool building_graph = true;
21    int unique_filter_id;
22    bool firstround;
23
24    if(building_graph)
25    {
26      CWorkflowGraph::allocNodeEdge();
27
28      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId());
29
30      // first round
31      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end())
32      {
33        firstround = true;
34        this->filterID = InvalidableObject::filterIdGenerator++;
35        int edgeID = InvalidableObject::edgeIdGenerator++;
36
37        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]);
38        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag;
39        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1;
40
41
42        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
43        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes();
44     
45
46        if(CWorkflowGraph::build_begin)
47        {
48          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
49          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++;
50
51          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;
52        }
53        else CWorkflowGraph::build_begin = true;
54
55        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 
56        unique_filter_id = this->filterID;
57      }
58      // not first round
59      else 
60      {
61        firstround=false;
62        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash];
63        if(data[0]->src_filterID != unique_filter_id)
64        {
65          int edgeID = InvalidableObject::edgeIdGenerator++;
66          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 
67          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
68          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++;
69        }
70      } 
71    }
72
73    return std::make_tuple(building_graph, firstround, unique_filter_id);
74  }
75
76
77  CDataPacketPtr CScalarFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data)
78  {
79   
80    CDataPacketPtr packet(new CDataPacket);
81    packet->date = data[0]->date;
82    packet->timestamp = data[0]->timestamp;
83    packet->status = data[0]->status;
84
85    std::tuple<int, int, int> graph = buildGraph(data);
86
87    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph);
88    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1;
89    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance;
90
91    packet->field = this->field;
92
93    if (packet->status == CDataPacket::NO_ERROR)
94      packet->data.reference(op(value, data[0]->data));
95
96    return packet;
97  }
98
99  CFieldScalarArithmeticFilter::CFieldScalarArithmeticFilter(CGarbageCollector& gc, const std::string& op, double value)
100    : CFilter(gc, 1, this)
101    , op(operatorExpr.getOpFieldScalar(op))
102    , value(value)
103  { 
104    expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1);
105  };
106
107  std::tuple<int, int, int> CFieldScalarArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data)
108  {
109    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
110    // bool building_graph = true;
111    int unique_filter_id;
112    bool firstround;
113
114    if(building_graph)
115    {
116      CWorkflowGraph::allocNodeEdge();
117
118      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId());
119
120      // first round
121      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end())
122      {
123        firstround = true;
124        this->filterID = InvalidableObject::filterIdGenerator++;
125        int edgeID = InvalidableObject::edgeIdGenerator++;
126
127        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]);
128        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag;
129        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1;
130
131
132        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
133        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes();
134     
135
136        if(CWorkflowGraph::build_begin)
137        {
138          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
139          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++;
140
141          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;
142        }
143        else CWorkflowGraph::build_begin = true;
144
145        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 
146        unique_filter_id = this->filterID;
147      }
148      // not first round
149      else 
150      {
151        firstround=false;
152        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash];
153        if(data[0]->src_filterID != unique_filter_id)
154        {
155          int edgeID = InvalidableObject::edgeIdGenerator++;
156          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 
157          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
158          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++;
159        }
160      } 
161    }
162
163    return std::make_tuple(building_graph, firstround, unique_filter_id);
164  }
165
166  CDataPacketPtr CFieldScalarArithmeticFilter::apply(std::vector<CDataPacketPtr> data)
167  {
168
169    CDataPacketPtr packet(new CDataPacket);
170    packet->date = data[0]->date;
171    packet->timestamp = data[0]->timestamp;
172    packet->status = data[0]->status;
173
174    std::tuple<int, int, int> graph = buildGraph(data);
175
176    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph);
177    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1;
178    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance;
179
180    packet->field = this->field;
181
182    if (packet->status == CDataPacket::NO_ERROR)
183      packet->data.reference(op(data[0]->data, value));
184
185    return packet;
186  }
187
188  CFieldFieldArithmeticFilter::CFieldFieldArithmeticFilter(CGarbageCollector& gc, const std::string& op)
189    : CFilter(gc, 2, this)
190    , op(operatorExpr.getOpFieldField(op))
191  { 
192    expression.assign(*yacc_globalInputText_ptr, 0, yacc_globalInputText_ptr->size()-1);
193  };
194
195  std::tuple<int, int, int> CFieldFieldArithmeticFilter::buildGraph(std::vector<CDataPacketPtr> data)
196  {
197    bool building_graph = this->tag ? ((data[0]->timestamp >= this->field->field_graph_start && data[0]->timestamp <= this->field->field_graph_end) && (data[0]->timestamp == data[1]->timestamp)) : false;
198
199    int unique_filter_id;
200
201    bool firstround;
202
203    if(building_graph)
204    { 
205      CWorkflowGraph::allocNodeEdge();
206
207      // std::cout<<"CFieldFieldArithmeticFilter::apply filter tag = "<<this->tag<<std::endl;
208
209      size_t filterhash = std::hash<StdString>{}(expression+to_string(data[0]->timestamp)+this->field->getId());
210
211      // first round
212      if(CWorkflowGraph::mapHashFilterID_ptr->find(filterhash) == CWorkflowGraph::mapHashFilterID_ptr->end())
213      {
214        firstround = true;
215        this->filterID = InvalidableObject::filterIdGenerator++;
216        int edgeID = InvalidableObject::edgeIdGenerator++;
217   
218        CWorkflowGraph::addNode(this->filterID, "Arithmetic Filter\\n("+expression+")", 3, 1, 0, data[0]);
219        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes();
220        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = data[0]->distance+1;
221
222        if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "</br>file attributes : </br>" +this->field->file->record4graphXiosAttributes();
223   
224        (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_tag = this->tag;
225        if(CWorkflowGraph::build_begin)
226        {
227
228          CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);
229          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++;
230
231          edgeID = InvalidableObject::edgeIdGenerator++;
232
233          CWorkflowGraph::addEdge(edgeID, this->filterID, data[1]);
234          (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb ++;
235
236          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ;
237          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ;
238        }
239        CWorkflowGraph::build_begin = true;
240
241        (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash] = this->filterID; 
242        unique_filter_id = this->filterID;
243 
244      }
245      // not first round
246      else 
247      {
248        firstround = false;
249        unique_filter_id = (*CWorkflowGraph::mapHashFilterID_ptr)[filterhash];
250        if(data[0]->src_filterID != unique_filter_id)
251        {
252          int edgeID = InvalidableObject::edgeIdGenerator++;
253          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[0]); 
254          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0 ; 
255          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++;
256        }
257        if(data[1]->src_filterID != unique_filter_id)
258        { 
259          int edgeID = InvalidableObject::edgeIdGenerator++;
260          CWorkflowGraph::addEdge(edgeID, unique_filter_id, data[1]); 
261          (*CWorkflowGraph::mapFilters_ptr_with_info)[data[1]->src_filterID].filter_filled = 0 ;
262          (*CWorkflowGraph::mapFilters_ptr_with_info)[unique_filter_id].expected_entry_nb ++;
263        }
264       
265      } 
266    }
267
268    return std::make_tuple(building_graph, firstround, unique_filter_id);
269  }
270
271  CDataPacketPtr CFieldFieldArithmeticFilter::apply(std::vector<CDataPacketPtr> data)
272  {
273
274    CDataPacketPtr packet(new CDataPacket);
275    packet->date = data[0]->date;
276    packet->timestamp = data[0]->timestamp;
277
278    std::tuple<int, int, int> graph = buildGraph(data);
279
280    if(std::get<0>(graph)) packet->src_filterID = std::get<2>(graph);
281    if(std::get<0>(graph) && std::get<1>(graph)) packet->distance = data[0]->distance+1;
282    if(std::get<0>(graph) && !std::get<1>(graph)) packet->distance = data[0]->distance;
283   
284    packet->field = this->field;
285   
286
287    if (data[0]->status != CDataPacket::NO_ERROR)
288      packet->status = data[0]->status;
289    else if (data[1]->status != CDataPacket::NO_ERROR)
290      packet->status = data[1]->status;
291    else
292    {
293      packet->status = CDataPacket::NO_ERROR;
294      packet->data.reference(op(data[0]->data, data[1]->data));
295    }
296
297    return packet;
298  }
299
300  StdString CScalarFieldArithmeticFilter::GetName(void)    { return StdString("CScalarFieldArithmeticFilter"); }
301  StdString CFieldScalarArithmeticFilter::GetName(void)    { return StdString("CFieldScalarArithmeticFilter"); }
302  StdString CFieldFieldArithmeticFilter::GetName(void)     { return StdString("CFieldFieldArithmeticFilter"); }
303
304
305} // namespace xios
Note: See TracBrowser for help on using the repository browser.