#include "store_filter.hpp" #include "context.hpp" #include "grid.hpp" #include "timer.hpp" #include "file.hpp" namespace xios { CStoreFilter::CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid, bool detectMissingValues /*= false*/, double missingValue /*= 0.0*/) : CInputPin(gc, 1) , gc(gc) , context(context) , grid(grid) , detectMissingValues(detectMissingValues) , missingValue(missingValue) { if (!context) ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)", "Impossible to construct a store filter without providing a context."); if (!grid) ERROR("CStoreFilter::CStoreFilter(CContext* context, CGrid* grid)", "Impossible to construct a store filter without providing a grid."); } CConstDataPacketPtr CStoreFilter::getPacket(Time timestamp) { CTimer timer("CStoreFilter::getPacket"); CConstDataPacketPtr packet; const double timeout = CXios::recvFieldTimeout; do { if (canBeTriggered()) trigger(timestamp); timer.resume(); std::map::const_iterator it = packets.find(timestamp); if (it != packets.end()) packet = it->second; else // if the packet is not available yet, check if it can be received context->checkBuffersAndListen(); timer.suspend(); } while (!packet && timer.getCumulatedTime() < timeout); if (!packet) { std::map::const_iterator it ; info(0)<<"Impossible to get the packet with timestamp = " << timestamp<first<<" "; info(0)<filterID].distance = ++(data[0]->distance); (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); if(this->field->file) (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes += "
file attributes :
" +this->field->file->record4graphXiosAttributes(); // if(CXios::isClient) std::cout<<"CStoreFilter::apply filter tag = "<tag<filterID, data[0]);; (*CWorkflowGraph::mapFilters_ptr_with_info)[data[0]->src_filterID].filter_filled = 0; } else CWorkflowGraph::build_begin = true; } } void CStoreFilter::onInputReady(std::vector data) { buildGraph(data); CDataPacketPtr packet; if (detectMissingValues) { const size_t nbData = data[0]->data.numElements(); packet = CDataPacketPtr(new CDataPacket); packet->date = data[0]->date; packet->timestamp = data[0]->timestamp; packet->status = data[0]->status; packet->data.resize(nbData); packet->data = data[0]->data; for (size_t idx = 0; idx < nbData; ++idx) { if (NumTraits::isNan(packet->data(idx))) packet->data(idx) = missingValue; } } else { packet = data[0]; } packets.insert(std::make_pair(packet->timestamp, packet)); // The packet is always destroyed by the garbage collector // so we register but never unregister gc.registerObject(this, packet->timestamp); } bool CStoreFilter::mustAutoTrigger() const { return false; } bool CStoreFilter::isDataExpected(const CDate& date) const { return true; } void CStoreFilter::invalidate(Time timestamp) { CInputPin::invalidate(timestamp); packets.erase(packets.begin(), packets.lower_bound(timestamp)); } } // namespace xios