#include "source_filter.hpp" #include "grid.hpp" #include "exception.hpp" #include "calendar_util.hpp" #include #include "workflow_graph.hpp" namespace xios { CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, bool compression /*= true*/, bool mask /*= false*/, const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/, bool hasMissingValue /*= false*/, double defaultValue /*= 0.0*/) : COutputPin(gc, manualTrigger) , grid(grid) , compression(compression) , mask(mask) , offset(offset) , hasMissingValue(hasMissingValue), defaultValue(defaultValue) { if (!grid) ERROR("CSourceFilter::CSourceFilter(CGrid* grid)", "Impossible to construct a source filter without providing a grid."); } void CSourceFilter::buildGraph(CDataPacketPtr packet) { bool filter_interval=false; if (this->field) { if(this->field->field_graph_start == -1 && this->field->field_graph_end == -1) filter_interval = true; else filter_interval = packet->timestamp >= this->field->field_graph_start && packet->timestamp <= this->field->field_graph_end; } bool building_graph = this->tag ? filter_interval : false; if(building_graph) { this->filterID = InvalidableObject::filterIdGenerator++; packet->src_filterID=this->filterID; packet->field = this->field; packet->distance = 1; CWorkflowGraph::allocNodeEdge(); CWorkflowGraph::addNode(this->filterID, "Source Filter ", 1, 1, 0, packet); (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].attributes = this->field->record4graphXiosAttributes(); (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].field_id = this->field->getId(); (*CWorkflowGraph::mapFilters_ptr_with_info)[this->filterID].distance = 1; CWorkflowGraph::build_begin = true; } } template void CSourceFilter::streamData(CDate date, const CArray& data) { date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter CDataPacketPtr packet(new CDataPacket); packet->date = date; packet->timestamp = date; packet->status = CDataPacket::NO_ERROR; packet->data.resize(grid->storeIndex_client.numElements()); if (compression) { packet->data = defaultValue; grid->uncompressField(data, packet->data); } else { if (mask) grid->maskField(data, packet->data); else grid->inputField(data, packet->data); } // Convert missing values to NaN if (hasMissingValue) { const double nanValue = std::numeric_limits::quiet_NaN(); const size_t nbData = packet->data.numElements(); for (size_t idx = 0; idx < nbData; ++idx) { if (defaultValue == packet->data(idx)) packet->data(idx) = nanValue; } } if(CXios::isClient) buildGraph(packet); onOutputReady(packet); } template void CSourceFilter::streamData<1>(CDate date, const CArray& data); template void CSourceFilter::streamData<2>(CDate date, const CArray& data); template void CSourceFilter::streamData<3>(CDate date, const CArray& data); template void CSourceFilter::streamData<4>(CDate date, const CArray& data); template void CSourceFilter::streamData<5>(CDate date, const CArray& data); template void CSourceFilter::streamData<6>(CDate date, const CArray& data); template void CSourceFilter::streamData<7>(CDate date, const CArray& data); void CSourceFilter::streamDataFromServer(CDate date, const std::map >& data) { date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter CDataPacketPtr packet(new CDataPacket); packet->date = date; packet->timestamp = date; packet->status = CDataPacket::NO_ERROR; if (data.size() != grid->storeIndex_fromSrv.size()) ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map >& data)", << "Incoherent data received from servers," << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given."); packet->data.resize(grid->storeIndex_client.numElements()); std::map >::const_iterator it, itEnd = data.end(); for (it = data.begin(); it != itEnd; it++) { CArray& index = grid->storeIndex_fromSrv[it->first]; for (int n = 0; n < index.numElements(); n++) packet->data(index(n)) = it->second(n); } // Convert missing values to NaN if (hasMissingValue) { const double nanValue = std::numeric_limits::quiet_NaN(); const size_t nbData = packet->data.numElements(); for (size_t idx = 0; idx < nbData; ++idx) { if (defaultValue == packet->data(idx)) packet->data(idx) = nanValue; } } if(CXios::isClient) buildGraph(packet); onOutputReady(packet); } void CSourceFilter::signalEndOfStream(CDate date) { date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter CDataPacketPtr packet(new CDataPacket); packet->date = date; packet->timestamp = date; packet->status = CDataPacket::END_OF_STREAM; onOutputReady(packet); } } // namespace xios