source: XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/source_filter.cpp @ 1960

Last change on this file since 1960 was 1930, checked in by ymipsl, 4 years ago

Big update on on going work related to data distribution and transfer between clients and servers.
Revisite of the source and store filter using "connectors".

YM

File size: 4.7 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5#include <limits>
6
7namespace xios
8{
9  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
10                               bool compression /*= true*/, bool mask /*= false*/,
11                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/,
12                               bool hasMissingValue /*= false*/,
13                               double defaultValue /*= 0.0*/)
14    : COutputPin(gc, manualTrigger)
15    , grid(grid)
16    , compression(compression)
17    , mask(mask)
18    , offset(offset)
19    , hasMissingValue(hasMissingValue), defaultValue(defaultValue)
20  {
21    if (!grid)
22      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
23            "Impossible to construct a source filter without providing a grid.");
24  }
25
26  template <int N>
27  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
28  {
29    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
30
31    CDataPacketPtr packet(new CDataPacket);
32    packet->date = date;
33    packet->timestamp = date;
34    packet->status = CDataPacket::NO_ERROR;
35
36    packet->data.resize(grid->getStoreIndex_client().numElements());   
37   
38    if (compression)
39    {
40      packet->data = defaultValue;
41      grid->uncompressField(data, packet->data);   
42     
43      // Convert missing values to NaN
44      if (hasMissingValue) // probably to removed later
45      {
46        const double nanValue = std::numeric_limits<double>::quiet_NaN();
47        const size_t nbData = packet->data.numElements();
48        for (size_t idx = 0; idx < nbData; ++idx)
49        {
50          if (defaultValue == packet->data(idx))
51          packet->data(idx) = nanValue;
52        }
53      }
54    }
55    else
56    {
57      if (mask) grid->maskField(data, packet->data);       // => ie coming from model
58      //else grid->inputField(data, packet->data);
59      else 
60      {
61        packet->data.resize(data.numElements()) ; // temporary solution, create own source filter for data coming from client
62        CArray<double,1> tmp( (double*)data.dataFirst(),shape(data.numElements()),duplicateData) ;
63        packet->data.reference(tmp) ;   // nothing to do if coming from client to server => workflow view
64      }
65    }
66
67    onOutputReady(packet);
68  }
69
70  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
71  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
72  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
73  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
74  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
75  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
76  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
77
78  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
79  {
80    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
81
82    CDataPacketPtr packet(new CDataPacket);
83    packet->date = date;
84    packet->timestamp = date;
85    packet->status = CDataPacket::NO_ERROR;
86   
87    if (data.size() != grid->storeIndex_fromSrv_.size())
88      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
89            << "Incoherent data received from servers,"
90            << " expected " << grid->storeIndex_fromSrv_.size() << " chunks but " << data.size() << " were given.");
91
92    packet->data.resize(grid->getStoreIndex_client().numElements());
93    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
94    for (it = data.begin(); it != itEnd; it++)
95    {     
96      CArray<int,1>& index = grid->storeIndex_fromSrv_[it->first];
97      for (int n = 0; n < index.numElements(); n++)
98        packet->data(index(n)) = it->second(n);
99    }
100
101    // Convert missing values to NaN
102    if (hasMissingValue)
103    {
104      const double nanValue = std::numeric_limits<double>::quiet_NaN();
105      const size_t nbData = packet->data.numElements();
106      for (size_t idx = 0; idx < nbData; ++idx)
107      {
108        if (defaultValue == packet->data(idx))
109          packet->data(idx) = nanValue;
110      }
111    }
112
113    onOutputReady(packet);
114  }
115
116  void CSourceFilter::signalEndOfStream(CDate date)
117  {
118    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
119
120    CDataPacketPtr packet(new CDataPacket);
121    packet->date = date;
122    packet->timestamp = date;
123    packet->status = CDataPacket::END_OF_STREAM;
124    onOutputReady(packet);
125  }
126} // namespace xios
Note: See TracBrowser for help on using the repository browser.