Changeset 1006 for XIOS/trunk/src


Ignore:
Timestamp:
11/28/16 14:02:54 (7 years ago)
Author:
rlacroix
Message:

The workflow is now triggered when using xios_recv_field for fields in read mode received from the servers.

Previously the workflow was triggered upon receiving the data which could cause deadlocks since there are no garanties that clients are receiving data at the same time.

Location:
XIOS/trunk/src
Files:
14 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/filter/filter.cpp

    r827 r1006  
    55  CFilter::CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine) 
    66    : CInputPin(gc, inputSlotsCount) 
    7     , COutputPin() 
     7    , COutputPin(gc) 
    88    , engine(engine) 
    99    , inputSlotCount(inputSlotCount) 
     
    1414    CDataPacketPtr outputPacket = engine->apply(data); 
    1515    if (outputPacket) 
    16       deliverOuput(outputPacket); 
     16      onOutputReady(outputPacket); 
     17  } 
     18 
     19  void CFilter::setInputTrigger(size_t inputSlot, COutputPin* trigger) 
     20  { 
     21    // Was the filter already triggerable? If not, we need to inform 
     22    // all downstream filters. 
     23    bool wasTriggerable = canBeTriggered(); 
     24 
     25    CInputPin::setInputTrigger(inputSlot, trigger); 
     26 
     27    if (!wasTriggerable) 
     28      setOutputTriggers(); 
     29  } 
     30 
     31  void CFilter::trigger(Time timestamp) 
     32  { 
     33    CInputPin::trigger(timestamp); 
     34 
     35    COutputPin::trigger(timestamp); 
     36  } 
     37 
     38  bool CFilter::canBeTriggered() const 
     39  { 
     40    return (CInputPin::canBeTriggered() || COutputPin::canBeTriggered()); 
    1741  } 
    1842} // namespace xios 
  • XIOS/trunk/src/filter/filter.hpp

    r827 r1006  
    2626      CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine); 
    2727 
     28      /*! 
     29       * Sets the trigger for a specific input slot. 
     30       * 
     31       * \param inputSlot the input slot number 
     32       * \param trigger the corresponding trigger 
     33       */ 
     34      void virtual setInputTrigger(size_t inputSlot, COutputPin* trigger); 
     35 
     36      /*! 
     37       * Triggers the filter for the specified timestamp. 
     38       * 
     39       * \param timestamp the timestamp for which we are triggering the filter 
     40       */ 
     41      void virtual trigger(Time timestamp); 
     42 
     43      /*! 
     44       * Tests if the filter can be triggered. 
     45       * 
     46       * \return true if the filter can be triggered 
     47       */ 
     48      bool virtual canBeTriggered() const; 
     49 
    2850    protected: 
    2951      IFilterEngine* engine; //!< The filter engine, might be the filter itself 
  • XIOS/trunk/src/filter/garbage_collector.cpp

    r639 r1006  
    33namespace xios 
    44{ 
    5   void CGarbageCollector::registerFilter(CInputPin* inputPin, Time timestamp) 
     5  void CGarbageCollector::registerObject(InvalidableObject* Object, Time timestamp) 
    66  { 
    7     registeredFilters[timestamp].insert(inputPin); 
     7    registeredObjects[timestamp].insert(Object); 
    88  } 
    99 
    10   void CGarbageCollector::unregisterFilter(CInputPin* inputPin, Time timestamp) 
     10  void CGarbageCollector::unregisterObject(InvalidableObject* Object, Time timestamp) 
    1111  { 
    12     std::map<Time, std::set<CInputPin*> >::iterator it = registeredFilters.find(timestamp); 
    13     if (it != registeredFilters.end()) 
    14       it->second.erase(inputPin); 
     12    std::map<Time, std::set<InvalidableObject*> >::iterator it = registeredObjects.find(timestamp); 
     13    if (it != registeredObjects.end()) 
     14      it->second.erase(Object); 
    1515  } 
    1616 
    1717  void CGarbageCollector::invalidate(Time timestamp) 
    1818  { 
    19     std::map<Time, std::set<CInputPin*> >::iterator it    = registeredFilters.begin(), 
    20                                                     itEnd = registeredFilters.lower_bound(timestamp); 
     19    std::map<Time, std::set<InvalidableObject*> >::iterator it    = registeredObjects.begin(), 
     20                                                            itEnd = registeredObjects.lower_bound(timestamp); 
    2121    for (; it != itEnd; ++it) 
    2222    { 
    23       std::set<CInputPin*>::iterator itFilter    = it->second.begin(), 
    24                                      itFilterEnd = it->second.end(); 
    25       for (; itFilter != itFilterEnd; ++itFilter) 
    26         (*itFilter)->invalidate(timestamp); 
     23      std::set<InvalidableObject*>::iterator itObject    = it->second.begin(), 
     24                                             itObjectEnd = it->second.end(); 
     25      for (; itObject != itObjectEnd; ++itObject) 
     26        (*itObject)->invalidate(timestamp); 
    2727    } 
    28     registeredFilters.erase(registeredFilters.begin(), itEnd); 
     28    registeredObjects.erase(registeredObjects.begin(), itEnd); 
    2929  } 
    3030} // namespace xios 
  • XIOS/trunk/src/filter/garbage_collector.hpp

    r639 r1006  
    55#include <set> 
    66 
    7 #include "input_pin.hpp" 
     7#include "date.hpp" 
    88 
    99namespace xios 
    1010{ 
     11  /*! 
     12   * Interface shared by all objects that might need to invalidate packets. 
     13   */ 
     14  struct InvalidableObject 
     15  { 
     16    /*! 
     17     * Removes all pending packets which are older than the specified timestamp. 
     18     * 
     19     * \param timestamp the timestamp used for invalidation 
     20     */ 
     21    void virtual invalidate(Time timestamp) = 0; 
     22  }; // struct InvalidableObject 
     23 
    1124  /*! 
    1225   * A basic garbage collector which ensures no old packets linger in the filter graph. 
     
    2235 
    2336      /*! 
    24        * Registers a filter for a specified timestamp. 
     37       * Registers an object for a specified timestamp. 
    2538       * 
    26        * \param inputPin the input pin of the filter to register 
    27        * \param timestamp the timestamp for which the filter is registered 
     39       * \param object the object to register 
     40       * \param timestamp the timestamp for which the object is registered 
    2841       */ 
    29       void registerFilter(CInputPin* inputPin, Time timestamp); 
     42      void registerObject(InvalidableObject* object, Time timestamp); 
    3043 
    3144      /*! 
    32        * Removes a filter previously registered for a specified timestamp. 
     45       * Removes a object previously registered for a specified timestamp. 
    3346       * 
    34        * \param inputPin the input pin of the filter to unregister 
    35        * \param timestamp the timestamp for which the filter is unregistered 
     47       * \param object the object to unregister 
     48       * \param timestamp the timestamp for which the object is unregistered 
    3649       */ 
    37       void unregisterFilter(CInputPin* inputPin, Time timestamp); 
     50      void unregisterObject(InvalidableObject* object, Time timestamp); 
    3851 
    3952      /*! 
    40        * Ensures all registered filters invalidate packets older than the specified timestamp. 
     53       * Ensures all registered objects invalidate packets older than the specified timestamp. 
    4154       * 
    4255       * \param timestamp the timestamp used for invalidation 
     
    4861      CGarbageCollector& operator=(const CGarbageCollector&); 
    4962 
    50       std::map<Time, std::set<CInputPin*> > registeredFilters; //!< Currently registered filters 
     63      std::map<Time, std::set<InvalidableObject*> > registeredObjects; //!< Currently registered objects 
    5164  }; // class CGarbageCollector 
    5265} // namespace xios 
  • XIOS/trunk/src/filter/input_pin.cpp

    r639 r1006  
    11#include "input_pin.hpp" 
     2#include "output_pin.hpp" 
    23#include "garbage_collector.hpp" 
    34#include "exception.hpp" 
     
    89    : gc(gc) 
    910    , slotsCount(slotsCount) 
     11    , triggers(slotsCount) 
     12    , hasTriggers(false) 
    1013  { /* Nothing to do */ } 
    1114 
     
    2326    { 
    2427      it = inputs.insert(std::make_pair(packet->timestamp, InputBuffer(slotsCount))).first; 
    25       gc.registerFilter(this, packet->timestamp); 
     28      gc.registerObject(this, packet->timestamp); 
    2629    } 
    2730    it->second.slotsFilled++; 
     
    3134    { 
    3235      // Unregister before calling onInputReady in case the filter registers again 
    33       gc.unregisterFilter(this, packet->timestamp); 
     36      gc.unregisterObject(this, packet->timestamp); 
    3437      onInputReady(it->second.packets); 
    3538      inputs.erase(it); 
    3639    } 
     40  } 
     41 
     42  void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger) 
     43  { 
     44    if (inputSlot >= slotsCount) 
     45      ERROR("void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)", 
     46            "The input slot " << inputSlot << " does not exist."); 
     47    if (triggers[inputSlot]) 
     48      ERROR("void CInputPin::setInputTrigger(size_t inputSlot, COutputPin* trigger)", 
     49            "The trigger for input slot " << inputSlot << " has already been set."); 
     50 
     51    triggers[inputSlot] = trigger; 
     52    hasTriggers = true; 
     53  } 
     54 
     55  void CInputPin::trigger(Time timestamp) 
     56  { 
     57    if (hasTriggers) // Don't use canBeTriggered here, this function is virtual and can be overriden 
     58    { 
     59      std::map<Time, InputBuffer>::iterator it = inputs.find(timestamp); 
     60      bool nothingReceived = (it == inputs.end()); 
     61 
     62      for (size_t s = 0; s < slotsCount; s++) 
     63      { 
     64        if (triggers[s] && (nothingReceived || !it->second.packets[s])) 
     65          triggers[s]->trigger(timestamp); 
     66      } 
     67    } 
     68  } 
     69 
     70  bool CInputPin::canBeTriggered() const 
     71  { 
     72    return hasTriggers; 
    3773  } 
    3874 
  • XIOS/trunk/src/filter/input_pin.hpp

    r639 r1006  
    55#include <map> 
    66 
     7#include "garbage_collector.hpp" 
    78#include "data_packet.hpp" 
    89 
    910namespace xios 
    1011{ 
    11   class CGarbageCollector; 
     12  class COutputPin; 
    1213 
    1314  /*! 
    1415   * An input pin handles the data packets received by a filter. 
    1516   */ 
    16   class CInputPin 
     17  class CInputPin : public InvalidableObject 
    1718  { 
    1819    public: 
     
    2728 
    2829      /*! 
     30       * Sets the trigger for a specific input slot. 
     31       * 
     32       * \param inputSlot the input slot number 
     33       * \param trigger the corresponding trigger 
     34       */ 
     35      void virtual setInputTrigger(size_t inputSlot, COutputPin* trigger); 
     36 
     37      /*! 
    2938       * Receives a data packet from an upstream filter on 
    3039       * the specified input slot. 
     
    3746 
    3847      /*! 
     48       * Triggers the input of any buffered packet for the specified timestamp. 
     49       * 
     50       * \param timestamp the timestamp for which we are triggering the input 
     51       */ 
     52      void virtual trigger(Time timestamp); 
     53 
     54      /*! 
     55       * Tests if the pin can be triggered. 
     56       * 
     57       * \return true if the pin can be triggered 
     58       */ 
     59      bool virtual canBeTriggered() const; 
     60 
     61      /*! 
    3962       * Removes all pending packets which are older than the specified timestamp. 
    4063       * 
     
    4467 
    4568    protected: 
    46       CGarbageCollector& gc; //!< The garbage collector associated to the input pin 
    47  
    4869      /*! 
    4970       * Function triggered when all slots have been filled for a specific timestamp. 
     
    7596      }; 
    7697 
     98      CGarbageCollector& gc; //!< The garbage collector associated to the input pin 
     99 
    77100      size_t slotsCount; //!< The number of slots 
    78101 
    79102      //! Input buffer, store the packets until all slots are full for a timestep 
    80103      std::map<Time, InputBuffer> inputs; 
     104 
     105      //! Store the triggers corresponding to the input slots 
     106      std::vector<COutputPin*> triggers; 
     107 
     108      //! Whether some triggers have been set 
     109      bool hasTriggers; 
    81110  }; // class CInputPin 
    82111} // namespace xios 
  • XIOS/trunk/src/filter/output_pin.cpp

    r637 r1006  
    44namespace xios 
    55{ 
     6  COutputPin::COutputPin(CGarbageCollector& gc, bool manualTrigger /*= false*/) 
     7    : gc(gc) 
     8    , manualTrigger(manualTrigger) 
     9  { /* Nothing to do */ } 
     10 
    611  void COutputPin::connectOutput(boost::shared_ptr<CInputPin> inputPin, size_t inputSlot) 
    712  { 
     
    1116 
    1217    outputs.push_back(std::make_pair(inputPin, inputSlot)); 
     18 
     19    if (canBeTriggered()) 
     20      inputPin->setInputTrigger(inputSlot, this); 
     21  } 
     22 
     23  void COutputPin::onOutputReady(CDataPacketPtr packet) 
     24  { 
     25    if (!packet) 
     26      ERROR("void COutputPin::onOutputReady(CDataPacketPtr packet)", 
     27            "The packet cannot be null."); 
     28 
     29    if (manualTrigger) // Don't use canBeTriggered here, this function is virtual and can be overriden 
     30    { 
     31      outputPackets[packet->timestamp] = packet; 
     32      gc.registerObject(this, packet->timestamp); 
     33    } 
     34    else 
     35      deliverOuput(packet); 
    1336  } 
    1437 
     
    2346      it->first->setInput(it->second, packet); 
    2447  } 
     48 
     49  void COutputPin::trigger(Time timestamp) 
     50  { 
     51    if (manualTrigger) // Don't use canBeTriggered here, this function is virtual and can be overriden 
     52    { 
     53      std::map<Time, CDataPacketPtr>::iterator it = outputPackets.find(timestamp); 
     54      if (it != outputPackets.end()) 
     55      { 
     56        gc.unregisterObject(this, timestamp); 
     57        deliverOuput(it->second); 
     58        outputPackets.erase(it); 
     59      } 
     60    } 
     61  } 
     62 
     63  bool COutputPin::canBeTriggered() const 
     64  { 
     65    return manualTrigger; 
     66  } 
     67 
     68  void COutputPin::setOutputTriggers() 
     69  { 
     70    std::vector<std::pair<boost::shared_ptr<CInputPin>, size_t> >::iterator it, itEnd; 
     71    for (it = outputs.begin(), itEnd = outputs.end(); it != itEnd; ++it) 
     72      it->first->setInputTrigger(it->second, this); 
     73  } 
     74 
     75  void COutputPin::invalidate(Time timestamp) 
     76  { 
     77    outputPackets.erase(outputPackets.begin(), outputPackets.lower_bound(timestamp)); 
     78  } 
    2579} // namespace xios 
  • XIOS/trunk/src/filter/output_pin.hpp

    r637 r1006  
    22#define __XIOS_COutputPin__ 
    33 
     4#include "garbage_collector.hpp" 
    45#include "input_pin.hpp" 
    56 
     
    910   * An output pin handles the connections with downstream filters. 
    1011   */ 
    11   class COutputPin 
     12  class COutputPin : public InvalidableObject 
    1213  { 
    1314    public: 
     15      /*! 
     16       * Constructs an ouput pin with manual or automatic trigger 
     17       * and an associated garbage collector. 
     18       * 
     19       * \param gc the garbage collector associated with this ouput pin 
     20       * \param slotsCount the number of slots 
     21       */ 
     22      COutputPin(CGarbageCollector& gc, bool manualTrigger = false); 
     23 
    1424      /*! 
    1525       * Connects to a specific slot of the input pin of a downstream filter. 
     
    2131      void connectOutput(boost::shared_ptr<CInputPin> inputPin, size_t inputSlot); 
    2232 
     33      /*! 
     34       * Triggers the output of any buffered packet for the specified timestamp. 
     35       * 
     36       * \param timestamp the timestamp for which we are triggering the output 
     37       */ 
     38      void virtual trigger(Time timestamp); 
     39 
     40      /*! 
     41       * Tests if the pin can be triggered. 
     42       * 
     43       * \return true if the pin can be triggered 
     44       */ 
     45      bool virtual canBeTriggered() const; 
     46 
     47      /*! 
     48       * Removes all pending packets which are older than the specified timestamp. 
     49       * 
     50       * \param timestamp the timestamp used for invalidation 
     51       */ 
     52      void virtual invalidate(Time timestamp); 
     53 
    2354    protected: 
     55      /*! 
     56       * Function triggered when a packet is ready to be delivered. 
     57       * 
     58       * \param packet the packet ready for output 
     59       */ 
     60      void onOutputReady(CDataPacketPtr packet); 
     61 
     62      /*! 
     63       * Informs the downstream pins that this output pin should be triggered. 
     64       */ 
     65      void setOutputTriggers(); 
     66 
     67    private: 
    2468      /*! 
    2569       * Delivers an output packet to the downstreams filter. 
     
    2973      void deliverOuput(CDataPacketPtr packet); 
    3074 
    31     private: 
     75      CGarbageCollector& gc; //!< The garbage collector associated to the output pin 
     76 
     77      //!< Whether the ouput should be triggered manually 
     78      bool manualTrigger; 
     79 
    3280      //!< The list of connected filters and the corresponding slot numbers 
    3381      std::vector<std::pair<boost::shared_ptr<CInputPin>, size_t> > outputs; 
     82 
     83      //! Output buffer, store the packets until the output is triggered 
     84      std::map<Time, CDataPacketPtr> outputPackets; 
    3485  }; // class COutputPin 
    3586} // namespace xios 
  • XIOS/trunk/src/filter/source_filter.cpp

    r988 r1006  
    66namespace xios 
    77{ 
    8   CSourceFilter::CSourceFilter(CGrid* grid, const CDuration offset /*= NoneDu*/) 
    9     : grid(grid) 
     8  CSourceFilter::CSourceFilter(CGarbageCollector& gc, CGrid* grid, 
     9                               const CDuration offset /*= NoneDu*/, bool manualTrigger /*= false*/) 
     10    : COutputPin(gc, manualTrigger) 
     11    , grid(grid) 
    1012    , offset(offset) 
    1113  { 
     
    2830    grid->inputField(data, packet->data); 
    2931 
    30     deliverOuput(packet); 
     32    onOutputReady(packet); 
    3133  } 
    3234 
     
    6466    } 
    6567 
    66     deliverOuput(packet); 
     68    onOutputReady(packet); 
    6769  } 
    6870 
     
    7375    packet->timestamp = date; 
    7476    packet->status = CDataPacket::END_OF_STREAM; 
    75     deliverOuput(packet); 
     77    onOutputReady(packet); 
    7678  } 
    7779} // namespace xios 
  • XIOS/trunk/src/filter/source_filter.hpp

    r756 r1006  
    1919       * Constructs a source filter accepting data attached to the specified grid. 
    2020       * 
     21       * \param gc the garbage collector associated with this filter 
    2122       * \param grid the grid to which the data is attached 
    2223       * \param offset the offset applied to the timestamp of all packets 
     24       * \param manualTrigger whether the output should be triggered manually 
    2325       */ 
    24       CSourceFilter(CGrid* grid, const CDuration offset = NoneDu); 
     26      CSourceFilter(CGarbageCollector& gc, CGrid* grid, 
     27                    const CDuration offset = NoneDu, bool manualTrigger = false); 
    2528 
    2629      /*! 
  • XIOS/trunk/src/filter/spatial_transform_filter.cpp

    r1003 r1006  
    5353    CDataPacketPtr outputPacket = spaceFilter->applyFilter(data, outputDefaultValue); 
    5454    if (outputPacket) 
    55       deliverOuput(outputPacket); 
     55      onOutputReady(outputPacket); 
    5656  } 
    5757 
  • XIOS/trunk/src/filter/store_filter.cpp

    r997 r1006  
    88  CStoreFilter::CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid) 
    99    : CInputPin(gc, 1) 
     10    , gc(gc) 
    1011    , context(context) 
    1112    , grid(grid) 
     
    2728    do 
    2829    { 
     30      if (canBeTriggered()) 
     31        trigger(timestamp); 
     32 
    2933      timer.resume(); 
    3034 
     
    7478    // The packet is always destroyed by the garbage collector 
    7579    // so we register but never unregister 
    76     gc.registerFilter(this, data[0]->timestamp); 
     80    gc.registerObject(this, data[0]->timestamp); 
    7781  } 
    7882 
  • XIOS/trunk/src/filter/store_filter.hpp

    r639 r1006  
    6565 
    6666    private: 
     67      CGarbageCollector& gc; //!< The garbage collector associated to the filter 
    6768      CContext* context; //!< The context to which the data belongs 
    6869      CGrid* grid; //!< The grid attached to the data the filter can accept 
  • XIOS/trunk/src/node/field.cpp

    r1003 r1006  
    800800       // Check if the data is to be read from a file 
    801801       else if (file && !file->mode.isEmpty() && file->mode == CFile::mode_attr::read) 
    802          instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 
    803                                                                                                      freq_offset.isEmpty() ? NoneDu : freq_offset)); 
     802         instantDataFilter = serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
     803                                                                                                     freq_offset.isEmpty() ? NoneDu : freq_offset, 
     804                                                                                                     true)); 
    804805       else // The data might be passed from the model 
    805          instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 
     806         instantDataFilter = clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
    806807     } 
    807808 
     
    874875       { 
    875876         if (!serverSourceFilter) 
    876            serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid, 
    877                                                                                    freq_offset.isEmpty() ? NoneDu : freq_offset)); 
     877           serverSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid, 
     878                                                                                   freq_offset.isEmpty() ? NoneDu : freq_offset, 
     879                                                                                   true)); 
    878880 
    879881         selfReferenceFilter = serverSourceFilter; 
     
    888890       { 
    889891         if (!clientSourceFilter) 
    890            clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(grid)); 
     892           clientSourceFilter = boost::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid)); 
    891893 
    892894         selfReferenceFilter = clientSourceFilter; 
Note: See TracChangeset for help on using the changeset viewer.