#include "source_filter.hpp" #include "grid.hpp" #include "exception.hpp" #include "calendar_util.hpp" #include namespace xios { CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/, bool hasMissingValue /*= false*/, double defaultValue /*= 0.0*/) : COutputPin(gc, manualTrigger) , grid(grid) , offset(offset) , hasMissingValue(hasMissingValue), defaultValue(defaultValue) { if (!grid) ERROR("CSourceFilter::CSourceFilter(CGrid* grid)", "Impossible to construct a source filter without providing a grid."); } template void CSourceFilter::streamData(CDate date, const CArray& data) { date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter CDataPacketPtr packet(new CDataPacket); packet->date = date; packet->timestamp = date; packet->status = CDataPacket::NO_ERROR; packet->data.resize(grid->storeIndex_client.numElements()); grid->inputField(data, packet->data); // Convert missing values to NaN if (hasMissingValue) { const double nanValue = std::numeric_limits::quiet_NaN(); const size_t nbData = packet->data.numElements(); for (size_t idx = 0; idx < nbData; ++idx) { if (defaultValue == packet->data(idx)) packet->data(idx) = nanValue; } } onOutputReady(packet); } template void CSourceFilter::streamData<1>(CDate date, const CArray& data); template void CSourceFilter::streamData<2>(CDate date, const CArray& data); template void CSourceFilter::streamData<3>(CDate date, const CArray& data); template void CSourceFilter::streamData<4>(CDate date, const CArray& data); template void CSourceFilter::streamData<5>(CDate date, const CArray& data); template void CSourceFilter::streamData<6>(CDate date, const CArray& data); template void CSourceFilter::streamData<7>(CDate date, const CArray& data); void CSourceFilter::streamDataFromServer(CDate date, const std::map >& data) { date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter CDataPacketPtr packet(new CDataPacket); packet->date = date; packet->timestamp = date; packet->status = CDataPacket::NO_ERROR; // if (data.size() != grid->storeIndex_toSrv.size()) if (data.size() != grid->storeIndex_fromSrv.size()) ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map >& data)", << "Incoherent data received from servers," << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given."); packet->data.resize(grid->storeIndex_client.numElements()); std::map >::const_iterator it, itEnd = data.end(); for (it = data.begin(); it != itEnd; it++) { // CArray& index = grid->storeIndex_toSrv[it->first]; CArray& index = grid->storeIndex_fromSrv[it->first]; for (int n = 0; n < index.numElements(); n++) packet->data(index(n)) = it->second(n); } // Convert missing values to NaN if (hasMissingValue) { const double nanValue = std::numeric_limits::quiet_NaN(); const size_t nbData = packet->data.numElements(); for (size_t idx = 0; idx < nbData; ++idx) { if (defaultValue == packet->data(idx)) packet->data(idx) = nanValue; } } onOutputReady(packet); } void CSourceFilter::signalEndOfStream(CDate date) { date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter CDataPacketPtr packet(new CDataPacket); packet->date = date; packet->timestamp = date; packet->status = CDataPacket::END_OF_STREAM; onOutputReady(packet); } } // namespace xios