source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_model_store_filter.cpp @ 2230

Last change on this file since 2230 was 2143, checked in by yushan, 3 years ago

Big commit on graph functionality. Add buildWorkflowGraph function for filters

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 4.7 KB
Line 
1#include "client_to_model_store_filter.hpp"
2#include "context.hpp"
3#include "grid.hpp"
4#include "timer.hpp"
5#include "tracer.hpp"
6#include "workflow_graph.hpp"
7
8namespace xios
9{
10  CClientToModelStoreFilter::CClientToModelStoreFilter(CGarbageCollector& gc, CField* field)
11    : CInputPin(gc, 1)
12    , gc_(gc), graphEnabled(false)
13  {
14    context_ = CContext::getCurrent() ;
15    grid_ = field->getGrid() ;
16
17    detectMissingValues_ = (!field->detect_missing_value.isEmpty() && !field->default_value.isEmpty() && field->detect_missing_value == true);
18    hasMissingValue_ = !field->default_value.isEmpty() ;
19    if (hasMissingValue_) missingValue_  = field->default_value ;
20    detectMissingValues_ = (!field->detect_missing_value.isEmpty() && hasMissingValue_);
21  }
22
23  CConstDataPacketPtr CClientToModelStoreFilter::getPacket(Time timestamp)
24  {
25    CTimer timer("CStoreFilter::getPacket");
26//    timer.resume();
27    info(0)<<"ENTERING CStoreFilter::getPacket"<<std::endl ;
28    traceOff() ;
29//    timer.suspend();
30    CConstDataPacketPtr packet;
31    const double timeout = CXios::recvFieldTimeout;
32
33    do
34    {
35      if (canBeTriggered()) trigger(timestamp);
36
37      timer.resume();
38
39      std::map<Time, CDataPacketPtr>::const_iterator it = packets_.find(timestamp);
40      if (it != packets_.end()) packet = it->second;
41      else  context_->eventLoop(); // if the packet is not available yet, check if it can be received
42
43      timer.suspend();
44    } while (!packet && timer.getCumulatedTime() < timeout);
45//    timer.resume();
46    traceOn() ;
47//    timer.suspend();
48
49    if (!packet)
50    {
51      std::map<Time, CDataPacketPtr>::const_iterator it ;
52      info(0)<<"Impossible to get the packet with timestamp = " << timestamp<<std::endl<<"Available timestamp are : "<<std::endl ;
53      for(it=packets_.begin();it!=packets_.end();++it) info(0)<<it->first<<"  ";
54      info(0)<<std::endl ;
55      ERROR("CConstDataPacketPtr CStoreFilter::getPacket(Time timestamp) const",
56            << "Impossible to get the packet with timestamp = " << timestamp);
57    }
58    return packet;
59  }
60
61  template <int N>
62  CDataPacket::StatusCode CClientToModelStoreFilter::getData(Time timestamp, CArray<double, N>& data)
63  {
64    CConstDataPacketPtr packet = getPacket(timestamp);
65
66    if (packet->status == CDataPacket::NO_ERROR)
67    {
68      if (hasMissingValue_) grid_->getWorkflowToModelConnector()->transfer(packet->data, data, missingValue_);
69      else grid_->getWorkflowToModelConnector()->transfer(packet->data, data);
70    }
71    return packet->status;
72  }
73
74  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<1>(Time timestamp, CArray<double, 1>& data);
75  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<2>(Time timestamp, CArray<double, 2>& data);
76  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<3>(Time timestamp, CArray<double, 3>& data);
77  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<4>(Time timestamp, CArray<double, 4>& data);
78  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<5>(Time timestamp, CArray<double, 5>& data);
79  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<6>(Time timestamp, CArray<double, 6>& data);
80  template CDataPacket::StatusCode CClientToModelStoreFilter::getData<7>(Time timestamp, CArray<double, 7>& data);
81
82  void CClientToModelStoreFilter::onInputReady(std::vector<CDataPacketPtr> data)
83  {
84
85    CDataPacketPtr packet;
86    if (detectMissingValues_)
87    {
88      const size_t nbData = data[0]->data.numElements();
89
90      packet = CDataPacketPtr(new CDataPacket);
91      packet->date = data[0]->date;
92      packet->timestamp = data[0]->timestamp;
93      packet->status = data[0]->status;
94      packet->data.resize(nbData);
95      packet->data = data[0]->data;
96
97      for (size_t idx = 0; idx < nbData; ++idx)
98      {
99        if (NumTraits<double>::isNan(packet->data(idx)))
100          packet->data(idx) = missingValue_;
101      }
102
103    }
104
105    else
106    {
107      packet = data[0];
108    }
109
110    if(this->graphEnabled)
111    {
112      this->graphPackage->filterId = CWorkflowGraph::getNodeSize();
113      CWorkflowGraph::addNode("Client to Model Store filter", 5, true, 1, packet);
114    }
115
116
117    packets_.insert(std::make_pair(packet->timestamp, packet));
118    // The packet is always destroyed by the garbage collector
119    // so we register but never unregister
120    gc_.registerObject(this, packet->timestamp);
121
122  }
123
124  bool CClientToModelStoreFilter::mustAutoTrigger() const
125  {
126    return false;
127  }
128
129  bool CClientToModelStoreFilter::isDataExpected(const CDate& date) const
130  {
131    return true;
132  }
133
134  void CClientToModelStoreFilter::invalidate(Time timestamp)
135  {
136    CInputPin::invalidate(timestamp);
137    packets_.erase(packets_.begin(), packets_.lower_bound(timestamp));
138  }
139} // namespace xios
Note: See TracBrowser for help on using the repository browser.