source: XIOS/dev/dev_trunk_omp/src/filter/store_filter.cpp @ 1679

Last change on this file since 1679 was 1679, checked in by yushan, 5 years ago

MARK: Dynamic workflow graph developement. Branch up to date with trunk @1676. Using vis.js

File size: 5.8 KB
RevLine 
[638]1#include "store_filter.hpp"
2#include "context.hpp"
3#include "grid.hpp"
4#include "timer.hpp"
5
6namespace xios
7{
[1201]8  CStoreFilter::CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid,
9                             bool detectMissingValues /*= false*/, double missingValue /*= 0.0*/)
[639]10    : CInputPin(gc, 1)
[1021]11    , gc(gc)
[638]12    , context(context)
13    , grid(grid)
[1201]14    , detectMissingValues(detectMissingValues)
15    , missingValue(missingValue)
[638]16  {
17    if (!context)
18      ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)",
19            "Impossible to construct a store filter without providing a context.");
[683]20    if (!grid)
[638]21      ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)",
22            "Impossible to construct a store filter without providing a grid.");
23  }
24
25  CConstDataPacketPtr CStoreFilter::getPacket(Time timestamp)
26  {
27    CTimer timer("CStoreFilter::getPacket");
28    CConstDataPacketPtr packet;
[1158]29    const double timeout = CXios::recvFieldTimeout;
[638]30
31    do
32    {
[1021]33      if (canBeTriggered())
34        trigger(timestamp);
35
[638]36      timer.resume();
37
38      std::map<Time, CDataPacketPtr>::const_iterator it = packets.find(timestamp);
39      if (it != packets.end())
40        packet = it->second;
41      else // if the packet is not available yet, check if it can be received
42        context->checkBuffersAndListen();
43
44      timer.suspend();
45    } while (!packet && timer.getCumulatedTime() < timeout);
46
47    if (!packet)
[1021]48    {
49      std::map<Time, CDataPacketPtr>::const_iterator it ;
[1646]50      #pragma omp critical (_output)
51      {
52        info(0)<<"Impossible to get the packet with timestamp = " << timestamp<<std::endl<<"Available timestamp are : "<<std::endl ;
53      }
54      for(it=packets.begin();it!=packets.end();++it)
55      {
56        #pragma omp critical (_output)
57        {
58          info(0)<<it->first<<"  ";
59        }
60      }
61      #pragma omp critical (_output)
62      { 
63        info(0)<<std::endl ;
64      }
[638]65      ERROR("CConstDataPacketPtr CStoreFilter::getPacket(Time timestamp) const",
66            << "Impossible to get the packet with timestamp = " << timestamp);
[1021]67    }
[638]68    return packet;
69  }
70
71  template <int N>
72  CDataPacket::StatusCode CStoreFilter::getData(Time timestamp, CArray<double, N>& data)
73  {
74    CConstDataPacketPtr packet = getPacket(timestamp);
75
76    if (packet->status == CDataPacket::NO_ERROR)
77      grid->outputField(packet->data, data);
78
79    return packet->status;
80  }
81
82  template CDataPacket::StatusCode CStoreFilter::getData<1>(Time timestamp, CArray<double, 1>& data);
83  template CDataPacket::StatusCode CStoreFilter::getData<2>(Time timestamp, CArray<double, 2>& data);
84  template CDataPacket::StatusCode CStoreFilter::getData<3>(Time timestamp, CArray<double, 3>& data);
[932]85  template CDataPacket::StatusCode CStoreFilter::getData<4>(Time timestamp, CArray<double, 4>& data);
86  template CDataPacket::StatusCode CStoreFilter::getData<5>(Time timestamp, CArray<double, 5>& data);
87  template CDataPacket::StatusCode CStoreFilter::getData<6>(Time timestamp, CArray<double, 6>& data);
88  template CDataPacket::StatusCode CStoreFilter::getData<7>(Time timestamp, CArray<double, 7>& data);
[638]89
90  void CStoreFilter::onInputReady(std::vector<CDataPacketPtr> data)
91  {
[1201]92
[1679]93    bool building_graph = this->tag ? data[0]->timestamp >= this->start_graph && data[0]->timestamp <= this->end_graph : false;
94
95    if(building_graph)
[1677]96    {
97      this->filterID = InvalidableObject::filterIdGenerator++;
[1679]98      int edgeID = InvalidableObject::edgeIdGenerator++;
[1677]99
[1679]100      if(CWorkflowGraph::mapFieldToFilters_ptr_with_info==0) CWorkflowGraph::mapFieldToFilters_ptr_with_info = new std::unordered_map <int, graph_info_box_edge >;
101      if(CWorkflowGraph::mapFilters_ptr_with_info==0) CWorkflowGraph::mapFilters_ptr_with_info = new std::unordered_map <int, graph_info_box_node>;
[1677]102
103
[1679]104      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_name = "Store Filter";
105      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_class = 4;
106      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].filter_filled = 1;
107      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].expected_entry_nb = 1;
108      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].date = data[0]->date;
109      (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].timestamp = data[0]->timestamp;
110
[1677]111      if(CXios::isClient) std::cout<<"CStoreFilter::apply filter tag = "<<this->tag<<std::endl;
112
[1679]113      if(CXios::isClient && CWorkflowGraph::build_begin) 
[1677]114      {
[1679]115        CWorkflowGraph::addEdge(edgeID, this->filterID, data[0]);;
116        (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0;
[1677]117      }
[1679]118      else CWorkflowGraph::build_begin = true;
[1677]119    }
120
121
122
[1251]123    CDataPacketPtr packet;
[1201]124    if (detectMissingValues)
125    {
[1252]126      const size_t nbData = data[0]->data.numElements();
[1251]127
[1252]128      packet = CDataPacketPtr(new CDataPacket);
129      packet->date = data[0]->date;
130      packet->timestamp = data[0]->timestamp;
131      packet->status = data[0]->status;
132      packet->data.resize(nbData);
133      packet->data = data[0]->data;
134
[1201]135      for (size_t idx = 0; idx < nbData; ++idx)
136      {
[1474]137        if (NumTraits<double>::isNan(packet->data(idx)))
[1252]138          packet->data(idx) = missingValue;
[1201]139      }
[1251]140
[1201]141    }
[1251]142
143    else
144    {
145      packet = data[0];
146    }
147
148    packets.insert(std::make_pair(packet->timestamp, packet));
149    // The packet is always destroyed by the garbage collector
150    // so we register but never unregister
151    gc.registerObject(this, packet->timestamp);
152
[638]153  }
[639]154
[1358]155  bool CStoreFilter::mustAutoTrigger() const
156  {
157    return false;
158  }
159
[1158]160  bool CStoreFilter::isDataExpected(const CDate& date) const
161  {
162    return true;
163  }
164
[639]165  void CStoreFilter::invalidate(Time timestamp)
166  {
167    CInputPin::invalidate(timestamp);
168    packets.erase(packets.begin(), packets.lower_bound(timestamp));
169  }
[1668]170
171
[638]172} // namespace xios
Note: See TracBrowser for help on using the repository browser.