source: XIOS/trunk/src/filter/source_filter.cpp @ 1006

Last change on this file since 1006 was 1006, checked in by rlacroix, 7 years ago

The workflow is now triggered when using xios_recv_field for fields in read mode received from the servers.

Previously the workflow was triggered upon receiving the data which could cause deadlocks since there are no garanties that clients are receiving data at the same time.

File size: 3.0 KB
Line 
1#include "source_filter.hpp"
2#include "grid.hpp"
3#include "exception.hpp"
4#include "calendar_util.hpp"
5
6namespace xios
7{
8  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid,
9                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/)
10    : COutputPin(gc, manualTrigger)
11    , grid(grid)
12    , offset(offset)
13  {
14    if (!grid)
15      ERROR("CSourceFilter::CSourceFilter(CGrid* grid)",
16            "Impossible to construct a source filter without providing a grid.");
17  }
18
19  template <int N>
20  void CSourceFilter::streamData(CDate date, const CArray<double, N>& data)
21  {
22    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
23
24    CDataPacketPtr packet(new CDataPacket);
25    packet->date = date;
26    packet->timestamp = date;
27    packet->status = CDataPacket::NO_ERROR;
28
29    packet->data.resize(grid->storeIndex_client.numElements());
30    grid->inputField(data, packet->data);
31
32    onOutputReady(packet);
33  }
34
35  template void CSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data);
36  template void CSourceFilter::streamData<2>(CDate date, const CArray<double, 2>& data);
37  template void CSourceFilter::streamData<3>(CDate date, const CArray<double, 3>& data);
38  template void CSourceFilter::streamData<4>(CDate date, const CArray<double, 4>& data);
39  template void CSourceFilter::streamData<5>(CDate date, const CArray<double, 5>& data);
40  template void CSourceFilter::streamData<6>(CDate date, const CArray<double, 6>& data);
41  template void CSourceFilter::streamData<7>(CDate date, const CArray<double, 7>& data);
42
43  void CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)
44  {
45    date = date + offset; // this is a temporary solution, it should be part of a proper temporal filter
46
47    CDataPacketPtr packet(new CDataPacket);
48    packet->date = date;
49    packet->timestamp = date;
50    packet->status = CDataPacket::NO_ERROR;
51
52    // if (data.size() != grid->storeIndex_toSrv.size())
53    if (data.size() != grid->storeIndex_fromSrv.size())
54      ERROR("CSourceFilter::streamDataFromServer(CDate date, const std::map<int, CArray<double, 1> >& data)",
55            << "Incoherent data received from servers,"
56            << " expected " << grid->storeIndex_fromSrv.size() << " chunks but " << data.size() << " were given.");
57
58    packet->data.resize(grid->storeIndex_client.numElements());
59    std::map<int, CArray<double, 1> >::const_iterator it, itEnd = data.end();
60    for (it = data.begin(); it != itEnd; it++)
61    {
62      // CArray<int,1>& index = grid->storeIndex_toSrv[it->first];
63      CArray<int,1>& index = grid->storeIndex_fromSrv[it->first];
64      for (int n = 0; n < index.numElements(); n++)
65        packet->data(index(n)) = it->second(n);
66    }
67
68    onOutputReady(packet);
69  }
70
71  void CSourceFilter::signalEndOfStream(CDate date)
72  {
73    CDataPacketPtr packet(new CDataPacket);
74    packet->date = date;
75    packet->timestamp = date;
76    packet->status = CDataPacket::END_OF_STREAM;
77    onOutputReady(packet);
78  }
79} // namespace xios
Note: See TracBrowser for help on using the repository browser.