source: XIOS/trunk/src/filter/input_pin.cpp @ 1006

Last change on this file since 1006 was 1006, checked in by rlacroix, 7 years ago

The workflow is now triggered when using xios_recv_field for fields in read mode received from the servers.

Previously the workflow was triggered upon receiving the data which could cause deadlocks since there are no garanties that clients are receiving data at the same time.

File size: 2.5 KB
Line 
1#include "input_pin.hpp"
2#include "output_pin.hpp"
3#include "garbage_collector.hpp"
4#include "exception.hpp"
5
6namespace xios
7{
8  CInputPin::CInputPin(CGarbageCollector& gc, size_t slotsCount)
9    : gc(gc)
10    , slotsCount(slotsCount)
11    , triggers(slotsCount)
12    , hasTriggers(false)
13  { /* Nothing to do */ }
14
15  void CInputPin::setInput(size_t inputSlot, CDataPacketPtr packet)
16  {
17    if (inputSlot >= slotsCount)
18      ERROR("void CInputPin::setInput(size_t inputSlot, CDataPacketPtr packet)",
19            "The input slot " << inputSlot << " does not exist.");
20    if (!packet)
21      ERROR("void CInputPin::setInput(size_t inputSlot, CDataPacketPtr packet)",
22            "The packet cannot be null.");
23
24    std::map<Time, InputBuffer>::iterator it = inputs.find(packet->timestamp);
25    if (it == inputs.end())
26    {
27      it = inputs.insert(std::make_pair(packet->timestamp, InputBuffer(slotsCount))).first;
28      gc.registerObject(this, packet->timestamp);
29    }
30    it->second.slotsFilled++;
31    it->second.packets[inputSlot] = packet;
32
33    if (it->second.slotsFilled == slotsCount)
34    {
35      // Unregister before calling onInputReady in case the filter registers again
36      gc.unregisterObject(this, packet->timestamp);
37      onInputReady(it->second.packets);
38      inputs.erase(it);
39    }
40  }
41
42  void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)
43  {
44    if (inputSlot >= slotsCount)
45      ERROR("void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)",
46            "The input slot " << inputSlot << " does not exist.");
47    if (triggers[inputSlot])
48      ERROR("void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)",
49            "The trigger for input slot " << inputSlot << " has already been set.");
50
51    triggers[inputSlot] = trigger;
52    hasTriggers = true;
53  }
54
55  void CInputPin::trigger(Time timestamp)
56  {
57    if (hasTriggers) // Don't use canBeTriggered here, this function is virtual and can be overriden
58    {
59      std::map<Time, InputBuffer>::iterator it = inputs.find(timestamp);
60      bool nothingReceived = (it == inputs.end());
61
62      for (size_t s = 0; s < slotsCount; s++)
63      {
64        if (triggers[s] && (nothingReceived || !it->second.packets[s]))
65          triggers[s]->trigger(timestamp);
66      }
67    }
68  }
69
70  bool CInputPin::canBeTriggered() const
71  {
72    return hasTriggers;
73  }
74
75  void CInputPin::invalidate(Time timestamp)
76  {
77    inputs.erase(inputs.begin(), inputs.lower_bound(timestamp));
78  }
79} // namespace xios
Note: See TracBrowser for help on using the repository browser.