Changeset 1006
- Timestamp:
- 11/28/16 14:02:54 (8 years ago)
- Location:
- XIOS/trunk/src
- Files:
-
- 14 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/trunk/src/filter/filter.cpp
r827 r1006 5 5 CFilter::CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine) 6 6 : CInputPin(gc, inputSlotsCount) 7 , COutputPin( )7 , COutputPin(gc) 8 8 , engine(engine) 9 9 , inputSlotCount(inputSlotCount) … … 14 14 CDataPacketPtr outputPacket = engine->apply(data); 15 15 if (outputPacket) 16 deliverOuput(outputPacket); 16 onOutputReady(outputPacket); 17 } 18 19 void CFilter::setInputTrigger(size_t inputSlot, COutputPin* trigger) 20 { 21 // Was the filter already triggerable? If not, we need to inform 22 // all downstream filters. 23 bool wasTriggerable = canBeTriggered(); 24 25 CInputPin::setInputTrigger(inputSlot, trigger); 26 27 if (!wasTriggerable) 28 setOutputTriggers(); 29 } 30 31 void CFilter::trigger(Time timestamp) 32 { 33 CInputPin::trigger(timestamp); 34 35 COutputPin::trigger(timestamp); 36 } 37 38 bool CFilter::canBeTriggered() const 39 { 40 return (CInputPin::canBeTriggered() || COutputPin::canBeTriggered()); 17 41 } 18 42 } // namespace xios -
XIOS/trunk/src/filter/filter.hpp
r827 r1006 26 26 CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine); 27 27 28 /*! 29 * Sets the trigger for a specific input slot. 30 * 31 * \param inputSlot the input slot number 32 * \param trigger the corresponding trigger 33 */ 34 void virtual setInputTrigger(size_t inputSlot, COutputPin* trigger); 35 36 /*! 37 * Triggers the filter for the specified timestamp. 38 * 39 * \param timestamp the timestamp for which we are triggering the filter 40 */ 41 void virtual trigger(Time timestamp); 42 43 /*! 44 * Tests if the filter can be triggered. 45 * 46 * \return true if the filter can be triggered 47 */ 48 bool virtual canBeTriggered() const; 49 28 50 protected: 29 51 IFilterEngine* engine; //!< The filter engine, might be the filter itself -
XIOS/trunk/src/filter/garbage_collector.cpp
r639 r1006 3 3 namespace xios 4 4 { 5 void CGarbageCollector::register Filter(CInputPin* inputPin, Time timestamp)5 void CGarbageCollector::registerObject(InvalidableObject* Object, Time timestamp) 6 6 { 7 registered Filters[timestamp].insert(inputPin);7 registeredObjects[timestamp].insert(Object); 8 8 } 9 9 10 void CGarbageCollector::unregister Filter(CInputPin* inputPin, Time timestamp)10 void CGarbageCollector::unregisterObject(InvalidableObject* Object, Time timestamp) 11 11 { 12 std::map<Time, std::set< CInputPin*> >::iterator it = registeredFilters.find(timestamp);13 if (it != registered Filters.end())14 it->second.erase( inputPin);12 std::map<Time, std::set<InvalidableObject*> >::iterator it = registeredObjects.find(timestamp); 13 if (it != registeredObjects.end()) 14 it->second.erase(Object); 15 15 } 16 16 17 17 void CGarbageCollector::invalidate(Time timestamp) 18 18 { 19 std::map<Time, std::set< CInputPin*> >::iterator it = registeredFilters.begin(),20 itEnd = registeredFilters.lower_bound(timestamp);19 std::map<Time, std::set<InvalidableObject*> >::iterator it = registeredObjects.begin(), 20 itEnd = registeredObjects.lower_bound(timestamp); 21 21 for (; it != itEnd; ++it) 22 22 { 23 std::set< CInputPin*>::iterator itFilter= it->second.begin(),24 itFilterEnd = it->second.end();25 for (; it Filter != itFilterEnd; ++itFilter)26 (*it Filter)->invalidate(timestamp);23 std::set<InvalidableObject*>::iterator itObject = it->second.begin(), 24 itObjectEnd = it->second.end(); 25 for (; itObject != itObjectEnd; ++itObject) 26 (*itObject)->invalidate(timestamp); 27 27 } 28 registered Filters.erase(registeredFilters.begin(), itEnd);28 registeredObjects.erase(registeredObjects.begin(), itEnd); 29 29 } 30 30 } // namespace xios -
XIOS/trunk/src/filter/garbage_collector.hpp
r639 r1006 5 5 #include <set> 6 6 7 #include " input_pin.hpp"7 #include "date.hpp" 8 8 9 9 namespace xios 10 10 { 11 /*! 12 * Interface shared by all objects that might need to invalidate packets. 13 */ 14 struct InvalidableObject 15 { 16 /*! 17 * Removes all pending packets which are older than the specified timestamp. 18 * 19 * \param timestamp the timestamp used for invalidation 20 */ 21 void virtual invalidate(Time timestamp) = 0; 22 }; // struct InvalidableObject 23 11 24 /*! 12 25 * A basic garbage collector which ensures no old packets linger in the filter graph. … … 22 35 23 36 /*! 24 * Registers a filterfor a specified timestamp.37 * Registers an object for a specified timestamp. 25 38 * 26 * \param inputPin the input pin of the filterto register27 * \param timestamp the timestamp for which the filteris registered39 * \param object the object to register 40 * \param timestamp the timestamp for which the object is registered 28 41 */ 29 void register Filter(CInputPin* inputPin, Time timestamp);42 void registerObject(InvalidableObject* object, Time timestamp); 30 43 31 44 /*! 32 * Removes a filterpreviously registered for a specified timestamp.45 * Removes a object previously registered for a specified timestamp. 33 46 * 34 * \param inputPin the input pin of the filterto unregister35 * \param timestamp the timestamp for which the filteris unregistered47 * \param object the object to unregister 48 * \param timestamp the timestamp for which the object is unregistered 36 49 */ 37 void unregister Filter(CInputPin* inputPin, Time timestamp);50 void unregisterObject(InvalidableObject* object, Time timestamp); 38 51 39 52 /*! 40 * Ensures all registered filters invalidate packets older than the specified timestamp.53 * Ensures all registered objects invalidate packets older than the specified timestamp. 41 54 * 42 55 * \param timestamp the timestamp used for invalidation … … 48 61 CGarbageCollector& operator=(const CGarbageCollector&); 49 62 50 std::map<Time, std::set< CInputPin*> > registeredFilters; //!< Currently registered filters63 std::map<Time, std::set<InvalidableObject*> > registeredObjects; //!< Currently registered objects 51 64 }; // class CGarbageCollector 52 65 } // namespace xios -
XIOS/trunk/src/filter/input_pin.cpp
r639 r1006 1 1 #include "input_pin.hpp" 2 #include "output_pin.hpp" 2 3 #include "garbage_collector.hpp" 3 4 #include "exception.hpp" … … 8 9 : gc(gc) 9 10 , slotsCount(slotsCount) 11 , triggers(slotsCount) 12 , hasTriggers(false) 10 13 { /* Nothing to do */ } 11 14 … … 23 26 { 24 27 it = inputs.insert(std::make_pair(packet->timestamp, InputBuffer(slotsCount))).first; 25 gc.register Filter(this, packet->timestamp);28 gc.registerObject(this, packet->timestamp); 26 29 } 27 30 it->second.slotsFilled++; … … 31 34 { 32 35 // Unregister before calling onInputReady in case the filter registers again 33 gc.unregister Filter(this, packet->timestamp);36 gc.unregisterObject(this, packet->timestamp); 34 37 onInputReady(it->second.packets); 35 38 inputs.erase(it); 36 39 } 40 } 41 42 void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger) 43 { 44 if (inputSlot >= slotsCount) 45 ERROR("void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)", 46 "The input slot " << inputSlot << " does not exist."); 47 if (triggers[inputSlot]) 48 ERROR("void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)", 49 "The trigger for input slot " << inputSlot << " has already been set."); 50 51 triggers[inputSlot] = trigger; 52 hasTriggers = true; 53 } 54 55 void CInputPin::trigger(Time timestamp) 56 { 57 if (hasTriggers) // Don't use canBeTriggered here, this function is virtual and can be overriden 58 { 59 std::map<Time, InputBuffer>::iterator it = inputs.find(timestamp); 60 bool nothingReceived = (it == inputs.end()); 61 62 for (size_t s = 0; s < slotsCount; s++) 63 { 64 if (triggers[s] && (nothingReceived || !it->second.packets[s])) 65 triggers[s]->trigger(timestamp); 66 } 67 } 68 } 69 70 bool CInputPin::canBeTriggered() const 71 { 72 return hasTriggers; 37 73 } 38 74 -
XIOS/trunk/src/filter/input_pin.hpp
r639 r1006 5 5 #include <map> 6 6 7 #include "garbage_collector.hpp" 7 8 #include "data_packet.hpp" 8 9 9 10 namespace xios 10 11 { 11 class C GarbageCollector;12 class COutputPin; 12 13 13 14 /*! 14 15 * An input pin handles the data packets received by a filter. 15 16 */ 16 class CInputPin 17 class CInputPin : public InvalidableObject 17 18 { 18 19 public: … … 27 28 28 29 /*! 30 * Sets the trigger for a specific input slot. 31 * 32 * \param inputSlot the input slot number 33 * \param trigger the corresponding trigger 34 */ 35 void virtual setInputTrigger(size_t inputSlot, COutputPin* trigger); 36 37 /*! 29 38 * Receives a data packet from an upstream filter on 30 39 * the specified input slot. … … 37 46 38 47 /*! 48 * Triggers the input of any buffered packet for the specified timestamp. 49 * 50 * \param timestamp the timestamp for which we are triggering the input 51 */ 52 void virtual trigger(Time timestamp); 53 54 /*! 55 * Tests if the pin can be triggered. 56 * 57 * \return true if the pin can be triggered 58 */ 59 bool virtual canBeTriggered() const; 60 61 /*! 39 62 * Removes all pending packets which are older than the specified timestamp. 40 63 * … … 44 67 45 68 protected: 46 CGarbageCollector& gc; //!< The garbage collector associated to the input pin47 48 69 /*! 49 70 * Function triggered when all slots have been filled for a specific timestamp. … … 75 96 }; 76 97 98 CGarbageCollector& gc; //!< The garbage collector associated to the input pin 99 77 100 size_t slotsCount; //!< The number of slots 78 101 79 102 //! Input buffer, store the packets until all slots are full for a timestep 80 103 std::map<Time, InputBuffer> inputs; 104 105 //! Store the triggers corresponding to the input slots 106 std::vector<COutputPin*> triggers; 107 108 //! Whether some triggers have been set 109 bool hasTriggers; 81 110 }; // class CInputPin 82 111 } // namespace xios -
XIOS/trunk/src/filter/output_pin.cpp
r637 r1006 4 4 namespace xios 5 5 { 6 COutputPin::COutputPin(CGarbageCollector& gc, bool manualTrigger /*= false*/) 7 : gc(gc) 8 , manualTrigger(manualTrigger) 9 { /* Nothing to do */ } 10 6 11 void COutputPin::connectOutput(boost::shared_ptr<CInputPin> inputPin, size_t inputSlot) 7 12 { … … 11 16 12 17 outputs.push_back(std::make_pair(inputPin, inputSlot)); 18 19 if (canBeTriggered()) 20 inputPin->setInputTrigger(inputSlot, this); 21 } 22 23 void COutputPin::onOutputReady(CDataPacketPtr packet) 24 { 25 if (!packet) 26 ERROR("void COutputPin::onOutputReady(CDataPacketPtr packet)", 27 "The packet cannot be null."); 28 29 if (manualTrigger) // Don't use canBeTriggered here, this function is virtual and can be overriden 30 { 31 outputPackets[packet->timestamp] = packet; 32 gc.registerObject(this, packet->timestamp); 33 } 34 else 35 deliverOuput(packet); 13 36 } 14 37 … … 23 46 it->first->setInput(it->second, packet); 24 47 } 48 49 void COutputPin::trigger(Time timestamp) 50 { 51 if (manualTrigger) // Don't use canBeTriggered here, this function is virtual and can be overriden 52 { 53 std::map<Time, CDataPacketPtr>::iterator it = outputPackets.find(timestamp); 54 if (it != outputPackets.end()) 55 { 56 gc.unregisterObject(this, timestamp); 57 deliverOuput(it->second); 58 outputPackets.erase(it); 59 } 60 } 61 } 62 63 bool COutputPin::canBeTriggered() const 64 { 65 return manualTrigger; 66 } 67 68 void COutputPin::setOutputTriggers() 69 { 70 std::vector<std::pair<boost::shared_ptr<CInputPin>, size_t> >::iterator it, itEnd; 71 for (it = outputs.begin(), itEnd = outputs.end(); it != itEnd; ++it) 72 it->first->setInputTrigger(it->second, this); 73 } 74 75 void COutputPin::invalidate(Time timestamp) 76 { 77 outputPackets.erase(outputPackets.begin(), outputPackets.lower_bound(timestamp)); 78 } 25 79 } // namespace xios -
XIOS/trunk/src/filter/output_pin.hpp
r637 r1006 2 2 #define __XIOS_COutputPin__ 3 3 4 #include "garbage_collector.hpp" 4 5 #include "input_pin.hpp" 5 6 … … 9 10 * An output pin handles the connections with downstream filters. 10 11 */ 11 class COutputPin 12 class COutputPin : public InvalidableObject 12 13 { 13 14 public: 15 /*! 16 * Constructs an ouput pin with manual or automatic trigger 17 * and an associated garbage collector. 18 * 19 * \param gc the garbage collector associated with this ouput pin 20 * \param slotsCount the number of slots 21 */ 22 COutputPin(CGarbageCollector& gc, bool manualTrigger = false); 23 14 24 /*! 15 25 * Connects to a specific slot of the input pin of a downstream filter. … … 21 31 void connectOutput(boost::shared_ptr<CInputPin> inputPin, size_t inputSlot); 22 32 33 /*! 34 * Triggers the output of any buffered packet for the specified timestamp. 35 * 36 * \param timestamp the timestamp for which we are triggering the output 37 */ 38 void virtual trigger(Time timestamp); 39 40 /*! 41 * Tests if the pin can be triggered. 42 * 43 * \return true if the pin can be triggered 44 */ 45 bool virtual canBeTriggered() const; 46 47 /*! 48 * Removes all pending packets which are older than the specified timestamp. 49 * 50 * \param timestamp the timestamp used for invalidation 51 */ 52 void virtual invalidate(Time timestamp); 53 23 54 protected: 55 /*! 56 * Function triggered when a packet is ready to be delivered. 57 * 58 * \param packet the packet ready for output 59 */ 60 void onOutputReady(CDataPacketPtr packet); 61 62 /*! 63 * Informs the downstream pins that this output pin should be triggered. 64 */ 65 void setOutputTriggers(); 66 67 private: 24 68 /*! 25 69 * Delivers an output packet to the downstreams filter. … … 29 73 void deliverOuput(CDataPacketPtr packet); 30 74 31 private: 75 CGarbageCollector& gc; //!< The garbage collector associated to the output pin 76 77 //!< Whether the ouput should be triggered manually 78 bool manualTrigger; 79 32 80 //!< The list of connected filters and the corresponding slot numbers 33 81 std::vector<std::pair<boost::shared_ptr<CInputPin>, size_t> > outputs; 82 83 //! Output buffer, store the packets until the output is triggered 84 std::map<Time, CDataPacketPtr> outputPackets; 34 85 }; // class COutputPin 35 86 } // namespace xios -
XIOS/trunk/src/filter/source_filter.cpp
r988 r1006 6 6 namespace xios 7 7 { 8 CSourceFilter::CSourceFilter(CGrid* grid, const CDuration offset /*= NoneDu*/) 9 : grid(grid) 8 CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, 9 const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/) 10 : COutputPin(gc, manualTrigger) 11 , grid(grid) 10 12 , offset(offset) 11 13 { … … 28 30 grid->inputField(data, packet->data); 29 31 30 deliverOuput(packet);32 onOutputReady(packet); 31 33 } 32 34 … … 64 66 } 65 67 66 deliverOuput(packet);68 onOutputReady(packet); 67 69 } 68 70 … … 73 75 packet->timestamp = date; 74 76 packet->status = CDataPacket::END_OF_STREAM; 75 deliverOuput(packet);77 onOutputReady(packet); 76 78 } 77 79 } // namespace xios -
XIOS/trunk/src/filter/source_filter.hpp
r756 r1006 19 19 * Constructs a source filter accepting data attached to the specified grid. 20 20 * 21 * \param gc the garbage collector associated with this filter 21 22 * \param grid the grid to which the data is attached 22 23 * \param offset the offset applied to the timestamp of all packets 24 * \param manualTrigger whether the output should be triggered manually 23 25 */ 24 CSourceFilter(CGrid* grid, const CDuration offset = NoneDu); 26 CSourceFilter(CGarbageCollector& gc, CGrid* grid, 27 const CDuration offset = NoneDu, bool manualTrigger = false); 25 28 26 29 /*! -
XIOS/trunk/src/filter/spatial_transform_filter.cpp
r1003 r1006 53 53 CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue); 54 54 if (outputPacket) 55 deliverOuput(outputPacket);55 onOutputReady(outputPacket); 56 56 } 57 57 -
XIOS/trunk/src/filter/store_filter.cpp
r997 r1006 8 8 CStoreFilter::CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid) 9 9 : CInputPin(gc, 1) 10 , gc(gc) 10 11 , context(context) 11 12 , grid(grid) … … 27 28 do 28 29 { 30 if (canBeTriggered()) 31 trigger(timestamp); 32 29 33 timer.resume(); 30 34 … … 74 78 // The packet is always destroyed by the garbage collector 75 79 // so we register but never unregister 76 gc.register Filter(this, data[0]->timestamp);80 gc.registerObject(this, data[0]->timestamp); 77 81 } 78 82 -
XIOS/trunk/src/filter/store_filter.hpp
r639 r1006 65 65 66 66 private: 67 CGarbageCollector& gc; //!< The garbage collector associated to the filter 67 68 CContext* context; //!< The context to which the data belongs 68 69 CGrid* grid; //!< The grid attached to the data the filter can accept -
XIOS/trunk/src/node/field.cpp
r1003 r1006 800 800 // Check if the data is to be read from a file 801 801 else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 802 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 803 freq_offset.isEmpty() ? NoneDu : freq_offset)); 802 instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 803 freq_offset.isEmpty() ? NoneDu : freq_offset, 804 true)); 804 805 else // The data might be passed from the model 805 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(g rid));806 instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 806 807 } 807 808 … … 874 875 { 875 876 if (!serverSourceFilter) 876 serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 877 freq_offset.isEmpty() ? NoneDu : freq_offset)); 877 serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 878 freq_offset.isEmpty() ? NoneDu : freq_offset, 879 true)); 878 880 879 881 selfReferenceFilter = serverSourceFilter; … … 888 890 { 889 891 if (!clientSourceFilter) 890 clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(g rid));892 clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 891 893 892 894 selfReferenceFilter = clientSourceFilter;
Note: See TracChangeset
for help on using the changeset viewer.