Changeset 639


Ignore:
Timestamp:
07/17/15 13:58:12 (6 years ago)
Author:
rlacroix
Message:

Add a basic garbage collector to ensure no packets linger in the filter graph.

Location:
XIOS/trunk/src
Files:
2 added
11 edited

Legend:

Unmodified
Added
Removed
  • XIOS/trunk/src/filter/file_writer_filter.cpp

    r638 r639  
    55namespace xios 
    66{ 
    7   CFileWriterFilter::CFileWriterFilter(CField* field) 
    8     : CInputPin(1) 
     7  CFileWriterFilter::CFileWriterFilter(CGarbageCollector& gc, CField* field) 
     8    : CInputPin(gc, 1) 
    99    , field(field) 
    1010  { 
  • XIOS/trunk/src/filter/file_writer_filter.hpp

    r638 r639  
    1515    public: 
    1616      /*! 
    17        * Constructs the filter (with one input slot) associated to the specified field. 
     17       * Constructs the filter (with one input slot) associated to the specified field 
     18       * and a garbage collector. 
    1819       * 
     20       * \param gc the associated garbage collector 
    1921       * \param field the associated field 
    2022       */ 
    21       CFileWriterFilter(CField* field); 
     23      CFileWriterFilter(CGarbageCollector& gc, CField* field); 
    2224 
    2325    protected: 
  • XIOS/trunk/src/filter/filter.cpp

    r637 r639  
    33namespace xios 
    44{ 
    5   CFilter::CFilter(size_t inputSlotsCount, IFilterEngine* engine) 
    6     : CInputPin(inputSlotsCount) 
     5  CFilter::CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine) 
     6    : CInputPin(gc, inputSlotsCount) 
    77    , COutputPin() 
    88    , engine(engine) 
  • XIOS/trunk/src/filter/filter.hpp

    r637 r639  
    2020       * and the specified engine. 
    2121       * 
     22       * \param gc the associated garbage collector 
    2223       * \param inputSlotsCount the number of input slots 
    2324       * \param engine the filter engine 
    2425       */ 
    25       CFilter(size_t inputSlotsCount, IFilterEngine* engine); 
     26      CFilter(CGarbageCollector& gc, size_t inputSlotsCount, IFilterEngine* engine); 
    2627 
    2728    protected: 
  • XIOS/trunk/src/filter/input_pin.cpp

    r637 r639  
    11#include "input_pin.hpp" 
     2#include "garbage_collector.hpp" 
    23#include "exception.hpp" 
    34 
    45namespace xios 
    56{ 
    6   CInputPin::CInputPin(size_t slotsCount) 
    7     : slotsCount(slotsCount) 
     7  CInputPin::CInputPin(CGarbageCollector& gc, size_t slotsCount) 
     8    : gc(gc) 
     9    , slotsCount(slotsCount) 
    810  { /* Nothing to do */ } 
    911 
     
    1921    std::map<Time, InputBuffer>::iterator it = inputs.find(packet->timestamp); 
    2022    if (it == inputs.end()) 
     23    { 
    2124      it = inputs.insert(std::make_pair(packet->timestamp, InputBuffer(slotsCount))).first; 
     25      gc.registerFilter(this, packet->timestamp); 
     26    } 
    2227    it->second.slotsFilled++; 
    2328    it->second.packets[inputSlot] = packet; 
     
    2530    if (it->second.slotsFilled == slotsCount) 
    2631    { 
     32      // Unregister before calling onInputReady in case the filter registers again 
     33      gc.unregisterFilter(this, packet->timestamp); 
    2734      onInputReady(it->second.packets); 
    2835      inputs.erase(it); 
    2936    } 
    3037  } 
     38 
     39  void CInputPin::invalidate(Time timestamp) 
     40  { 
     41    inputs.erase(inputs.begin(), inputs.lower_bound(timestamp)); 
     42  } 
    3143} // namespace xios 
  • XIOS/trunk/src/filter/input_pin.hpp

    r637 r639  
    99namespace xios 
    1010{ 
     11  class CGarbageCollector; 
     12 
    1113  /*! 
    1214   * An input pin handles the data packets received by a filter. 
     
    1618    public: 
    1719      /*! 
    18        * Constructs an input pin with the specified number of slots. 
     20       * Constructs an input pin with the specified number of slots 
     21       * and an associated garbage collector. 
    1922       * 
     23       * \param gc the garbage collector associated with this input pin 
    2024       * \param slotsCount the number of slots 
    2125       */ 
    22       CInputPin(size_t slotsCount); 
     26      CInputPin(CGarbageCollector& gc, size_t slotsCount); 
    2327 
    2428      /*! 
     
    3236      void setInput(size_t inputSlot, CDataPacketPtr packet); 
    3337 
     38      /*! 
     39       * Removes all pending packets which are older than the specified timestamp. 
     40       * 
     41       * \param timestamp the timestamp used for invalidation 
     42       */ 
     43      void virtual invalidate(Time timestamp); 
     44 
    3445    protected: 
     46      CGarbageCollector& gc; //!< The garbage collector associated to the input pin 
     47 
    3548      /*! 
    3649       * Function triggered when all slots have been filled for a specific timestamp. 
  • XIOS/trunk/src/filter/store_filter.cpp

    r638 r639  
    66namespace xios 
    77{ 
    8   CStoreFilter::CStoreFilter(CContext* context, CGrid* grid) 
    9     : CInputPin(1) 
     8  CStoreFilter::CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid) 
     9    : CInputPin(gc, 1) 
    1010    , context(context) 
    1111    , grid(grid) 
     
    6363  { 
    6464    packets.insert(std::make_pair(data[0]->timestamp, data[0])); 
     65    // The packet is always destroyed by the garbage collector 
     66    // so we register but never unregister 
     67    gc.registerFilter(this, data[0]->timestamp); 
     68  } 
     69 
     70  void CStoreFilter::invalidate(Time timestamp) 
     71  { 
     72    CInputPin::invalidate(timestamp); 
     73    packets.erase(packets.begin(), packets.lower_bound(timestamp)); 
    6574  } 
    6675} // namespace xios 
  • XIOS/trunk/src/filter/store_filter.hpp

    r638 r639  
    1616    public: 
    1717      /*! 
    18        * Constructs the filter with one input slot for the specified grid 
    19        * and context. 
     18       * Constructs the filter with one input slot and an associated 
     19       * garbage collector for the specified grid and context. 
    2020       * 
     21       * \param gc the garbage collector associated with this input pin 
    2122       * \param context the context to which the data belongs 
    2223       * \param grid the grid to which the data is attached 
    2324       */ 
    24       CStoreFilter(CContext* context, CGrid* grid); 
     25      CStoreFilter(CGarbageCollector& gc, CContext* context, CGrid* grid); 
    2526 
    2627      /*! 
     
    4849      CDataPacket::StatusCode getData(Time timestamp, CArray<double, N>& data); 
    4950 
     51      /*! 
     52       * Removes all pending packets which are older than the specified timestamp. 
     53       * 
     54       * \param timestamp the timestamp used for invalidation 
     55       */ 
     56      void virtual invalidate(Time timestamp); 
     57 
    5058    protected: 
    5159      /*! 
  • XIOS/trunk/src/interface/c/iccalendar.cpp

    r598 r639  
    1616    context->updateCalendar(step); 
    1717    context->sendUpdateCalendar(step); 
    18     context->checkPrefetchingOfEnabledReadModeFiles(); 
    1918    CTimer::get("XIOS").suspend(); 
    2019  } 
  • XIOS/trunk/src/node/context.cpp

    r619 r639  
    931931   void CContext::updateCalendar(int step) 
    932932   { 
    933       info(50)<<"updateCalendar : before : "<<calendar->getCurrentDate()<<endl; 
     933      info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl; 
    934934      calendar->update(step); 
    935       info(50)<<"updateCalendar : after : "<<calendar->getCurrentDate()<<endl; 
     935      info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl; 
     936 
     937      if (hasClient) 
     938      { 
     939        checkPrefetchingOfEnabledReadModeFiles(); 
     940        garbageCollector.invalidate(calendar->getCurrentDate()); 
     941      } 
    936942   } 
    937943 
  • XIOS/trunk/src/node/context.hpp

    r598 r639  
    1111//#include "context_server.hpp" 
    1212#include "data_output.hpp" 
     13#include "garbage_collector.hpp" 
    1314 
    1415#include "mpi.hpp" 
     
    218219         std::map<int, StdSize> dataSize_; 
    219220         StdString idServer_; 
     221         CGarbageCollector garbageCollector; 
    220222 
    221223 
Note: See TracChangeset for help on using the changeset viewer.