Changeset 2143
- Timestamp:
- 06/04/21 11:54:38 (22 months ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src/filter
- Files:
-
- 23 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.cpp
r2022 r2143 6 6 #include "grid.hpp" 7 7 #include <limits> 8 #include "workflow_graph.hpp" 8 9 9 10 namespace xios … … 39 40 40 41 grid_->getClientFromClientConnector()->transfer(event,packet->data) ; 42 43 if(this->graphEnabled) 44 { 45 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 46 CWorkflowGraph::addNode("Client to Client Source filter", 1, false, 0, packet); 47 } 41 48 onOutputReady(packet); 42 49 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp
r1988 r2143 10 10 #include "tracer.hpp" 11 11 #include <limits> 12 #include "workflow_graph.hpp" 12 13 13 14 namespace xios … … 56 57 info(20)<<"lastDateReceived_ "<<lastDateReceived_<< " date "<<packet->date<<endl; // make a registration at initialization once 57 58 packet->status = CDataPacket::NO_ERROR; 59 } 60 61 if(this->graphEnabled) 62 { 63 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 64 CWorkflowGraph::addNode("Client from Server Source filter", 1, false, 0, packet); 58 65 } 59 66 onOutputReady(packet); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_model_store_filter.cpp
r1930 r2143 4 4 #include "timer.hpp" 5 5 #include "tracer.hpp" 6 #include "workflow_graph.hpp" 6 7 7 8 namespace xios … … 9 10 CClientToModelStoreFilter::CClientToModelStoreFilter(CGarbageCollector& gc, CField* field) 10 11 : CInputPin(gc, 1) 11 , gc_(gc) 12 , gc_(gc), graphEnabled(false) 12 13 { 13 14 context_ = CContext::getCurrent() ; … … 107 108 } 108 109 110 if(this->graphEnabled) 111 { 112 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 113 CWorkflowGraph::addNode("Client to Model Store filter", 5, true, 1, packet); 114 } 115 116 109 117 packets_.insert(std::make_pair(packet->timestamp, packet)); 110 118 // The packet is always destroyed by the garbage collector -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_model_store_filter.hpp
r1930 r2143 3 3 4 4 #include "input_pin.hpp" 5 #include "graph_package.hpp" 5 6 6 7 namespace xios … … 73 74 void virtual invalidate(Time timestamp); 74 75 76 CGraphPackage * graphPackage; 77 bool graphEnabled; 78 75 79 protected: 76 80 /*! -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_server_store_filter.cpp
r2130 r2143 7 7 #include "field.hpp" 8 8 #include "grid.hpp" 9 #include "workflow_graph.hpp" 9 10 10 11 namespace xios … … 12 13 CClientToServerStoreFilter::CClientToServerStoreFilter(CGarbageCollector& gc, CField* field, CContextClient* client) 13 14 : CInputPin(gc, 1) 14 , field_(field), client_(client) 15 , field_(field), client_(client), graphEnabled(false) 15 16 { 16 17 if (!field) … … 21 22 void CClientToServerStoreFilter::onInputReady(std::vector<CDataPacketPtr> data) 22 23 { 24 buildWorkflowGraph(data); 25 23 26 CTimer::get("Field : send data").resume(); 24 27 CEventClient event(field_->getType(), CField::EVENT_ID_UPDATE_DATA); … … 27 30 field_->getSentGrid()->getClientToServerConnector(client_)->transfer(data[0]->data, client_, event, message) ; 28 31 CTimer::get("Field : send data").suspend(); 32 } 33 34 void CClientToServerStoreFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data) 35 { 36 if(this->graphEnabled) 37 { 38 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 39 if(!data[0]->graphPackage) data[0]->graphPackage = new CGraphDataPackage; 40 41 std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end()); 42 43 CWorkflowGraph::addNode("Client to Server Store filter", 6, true, 1, data[0]); 44 45 CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]); 46 // flux can be redirected to other filters. So don't change the 'from' parameter 47 data[0]->graphPackage->currentField = this->graphPackage->inFields[0]; 48 } 29 49 } 30 50 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_to_server_store_filter.hpp
r2130 r2143 3 3 4 4 #include "input_pin.hpp" 5 #include "graph_package.hpp" 5 6 6 7 namespace xios … … 37 38 */ 38 39 bool virtual mustAutoTrigger() const; 40 41 void buildWorkflowGraph(std::vector<CDataPacketPtr> data); 39 42 40 43 /*! … … 44 47 */ 45 48 bool virtual isDataExpected(const CDate& date) const; 49 CGraphPackage * graphPackage; 50 bool graphEnabled; 46 51 47 52 protected: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/data_packet.hpp
r1930 r2143 6 6 #include "array_new.hpp" 7 7 #include "date.hpp" 8 #include "graph_package.hpp" 8 9 9 10 namespace xios … … 21 22 END_OF_STREAM //!< Last packet of the stream, does not have data 22 23 }; 24 25 CGraphDataPackage * graphPackage; 23 26 24 27 CArray<double, 1> data; //!< Array containing the data … … 40 43 p->timestamp = timestamp; 41 44 p->status = status; 45 p->graphPackage = graphPackage; 42 46 return p; 43 47 }; 48 49 50 CDataPacket() : graphPackage(nullptr) {} 51 44 52 }; // struct CDataPacket 45 53 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_reader_source_filter.cpp
r1962 r2143 7 7 #include "file.hpp" 8 8 #include "context.hpp" 9 9 #include "workflow_graph.hpp" 10 10 11 11 namespace xios … … 45 45 46 46 info(20)<<"Read data from file : FieldId "<<field_->getId()<<" nStep "<<nStep_<<" date : "<<packet->date<<endl ; 47 48 if(this->graphEnabled) 49 { 50 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 51 CWorkflowGraph::addNode("File Reader Source filter", 1, false, 0, packet); 52 } 47 53 48 54 onOutputReady(packet); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_writer_store_filter.cpp
r1962 r2143 4 4 #include "file.hpp" 5 5 #include "context.hpp" 6 #include "workflow_graph.hpp" 6 7 7 8 namespace xios … … 9 10 CFileWriterStoreFilter::CFileWriterStoreFilter(CGarbageCollector& gc, CField* field) 10 11 : CInputPin(gc, 1) 11 , field_(field) 12 , field_(field), graphEnabled(false) 12 13 13 14 { … … 67 68 } 68 69 nstep_ = file_->getDataOutput()->writeFieldData(field_, fieldData, lastWrite_,currentWrite, nstep_); 70 if(this->graphEnabled) 71 { 72 73 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 74 if(!data[0]->graphPackage) data[0]->graphPackage = new CGraphDataPackage; 75 data[0]->graphPackage->currentField = this->graphPackage->inFields[0]; 76 std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end()); 77 78 CWorkflowGraph::addNode("File Writer Store filter", 5, true, 1, data[0]); 79 80 CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]); 81 data[0]->graphPackage->fromFilter = this->graphPackage->filterId; 82 83 84 } 69 85 } 70 86 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_writer_store_filter.hpp
r1961 r2143 3 3 4 4 #include "input_pin.hpp" 5 #include "graph_package.hpp" 5 6 6 7 namespace xios … … 39 40 bool virtual isDataExpected(const CDate& date) const; 40 41 42 CGraphPackage *graphPackage; 43 bool graphEnabled; 44 41 45 protected: 42 46 /*! -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/model_to_client_source_filter.cpp
r1930 r2143 4 4 #include "calendar_util.hpp" 5 5 #include <limits> 6 #include "workflow_graph.hpp" 6 7 7 8 namespace xios … … 45 46 } 46 47 48 buildWorkflowGraph(packet); 47 49 onOutputReady(packet); 48 50 } 51 52 void CModelToClientSourceFilter::buildWorkflowGraph(CDataPacketPtr packet) 53 { 54 if(this->graphEnabled) 55 { 56 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 57 if(!packet->graphPackage) 58 { 59 packet->graphPackage = new CGraphDataPackage; 60 } 61 packet->graphPackage->fromFilter = this->graphPackage->filterId; 62 packet->graphPackage->currentField = this->graphPackage->inFields[0]; 63 CWorkflowGraph::addNode("Model to Client Source filter", 1, false, 0, packet); 64 } 65 } 66 67 68 49 69 50 70 template void CModelToClientSourceFilter::streamData<1>(CDate date, const CArray<double, 1>& data); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/model_to_client_source_filter.hpp
r1930 r2143 37 37 void streamData(CDate date, const CArray<double, N>& data); 38 38 39 void buildWorkflowGraph(CDataPacketPtr packet); 40 41 39 42 /*! 40 43 * Transforms the data received from the server into a packet and send it -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/output_pin.cpp
r1542 r2143 6 6 COutputPin::COutputPin(CGarbageCollector& gc, bool manualTrigger /*= false*/) 7 7 : gc(gc) 8 , manualTrigger(manualTrigger) 8 , manualTrigger(manualTrigger), graphEnabled(false) 9 9 { /* Nothing to do */ } 10 10 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/output_pin.hpp
r1542 r2143 4 4 #include "garbage_collector.hpp" 5 5 #include "input_pin.hpp" 6 #include "graph_package.hpp" 6 7 7 8 namespace xios … … 65 66 */ 66 67 void virtual invalidate(Time timestamp); 68 CGraphPackage *graphPackage; 69 bool graphEnabled; 67 70 68 71 protected: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/pass_through_filter.cpp
r641 r2143 1 1 #include "pass_through_filter.hpp" 2 #include "workflow_graph.hpp" 3 #include <algorithm> 2 4 3 5 namespace xios … … 9 11 CDataPacketPtr CPassThroughFilter::apply(std::vector<CDataPacketPtr> data) 10 12 { 13 buildWorkflowGraph(data); 11 14 return data[0]; 12 15 } 16 17 void CPassThroughFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data) 18 { 19 if(this->graphEnabled) 20 { 21 if(data[0]->graphPackage && data[0]->graphPackage->currentField->getId() == this->graphPackage->inFields[0]->getId()) 22 { 23 std::cout<<"PASS THROUGH FILTER OMITTED "<<this<<std::endl; 24 return; 25 } 26 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 27 28 if(!data[0]->graphPackage) 29 { 30 data[0]->graphPackage = new CGraphDataPackage; 31 data[0]->graphPackage->fromFilter = -1; 32 data[0]->graphPackage->currentField = this->graphPackage->inFields[0]; 33 } 34 35 for(int i=0; i<this->graphPackage->filterId; i++) 36 { 37 if(CXios::isClient 38 && (*CWorkflowGraph::vectorOfNodes_)[i].label_field_id == this->label_field_id 39 && (*CWorkflowGraph::vectorOfNodes_)[i].timestamp == data[0]->timestamp 40 && (*CWorkflowGraph::vectorOfNodes_)[i].filter_name == "Pass through filter" ) 41 { 42 data[0]->graphPackage->fromFilter = i; 43 return; 44 } 45 } 46 47 CWorkflowGraph::addNode("Pass through filter", 2, false, 1, data[0]); 48 if(CXios::isClient) (*CWorkflowGraph::vectorOfNodes_)[this->graphPackage->filterId].label_field_id = this->label_field_id; 49 CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]); 50 51 data[0]->graphPackage->currentField = this->graphPackage->inFields[0]; 52 std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end()); 53 54 data[0]->graphPackage->fromFilter = this->graphPackage->filterId; 55 56 } 57 } 58 59 13 60 } // namespace xios -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/pass_through_filter.hpp
r641 r2143 20 20 CPassThroughFilter(CGarbageCollector& gc); 21 21 22 StdString label_field_id; //used for omitting redundant pass through filter in graph 23 22 24 protected: 23 25 /*! … … 28 30 */ 29 31 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 32 void buildWorkflowGraph(std::vector<CDataPacketPtr> data); 33 30 34 }; // class CPassThroughFilter 31 35 } // namespace xios -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_from_client_source_filter.cpp
r1930 r2143 4 4 #include "calendar_util.hpp" 5 5 #include "context.hpp" 6 6 #include "workflow_graph.hpp" 7 7 8 8 namespace xios … … 27 27 grid_->getServerFromClientConnector()->transfer(event,packet->data) ; 28 28 29 if(this->graphEnabled) 30 { 31 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 32 packet->graphPackage = new CGraphDataPackage; 33 packet->graphPackage->fromFilter = this->graphPackage->filterId; 34 packet->graphPackage->currentField = this->graphPackage->inFields[0]; 35 CWorkflowGraph::addNode("Server from Client Source filter", 1, false, 0, packet); 36 } 29 37 onOutputReady(packet); 30 38 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_to_client_store_filter.cpp
r2130 r2143 5 5 #include "utils.hpp" 6 6 #include "context_client.hpp" 7 #include "workflow_graph.hpp" 7 8 8 9 namespace xios … … 10 11 CServerToClientStoreFilter::CServerToClientStoreFilter(CGarbageCollector& gc, CField* field, CContextClient* client) 11 12 : CInputPin(gc, 1) 12 , field_(field), client_(client) 13 , field_(field), client_(client), graphEnabled(false) 13 14 { 14 15 if (!field) ERROR("CServerToClientFilter::CServerToClientFilter(CField* field)", "The field cannot be null."); … … 27 28 CMessage msg ; 28 29 msg<<field_->getId() ; 30 31 if(this->graphEnabled) 32 { 33 CWorkflowGraph::addNode("Server to Client Store filter", 5, true, 1, packets[0]); 34 } 29 35 30 36 if (isEOF) -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/server_to_client_store_filter.hpp
r2130 r2143 3 3 4 4 #include "input_pin.hpp" 5 #include "graph_package.hpp" 5 6 6 7 namespace xios … … 46 47 */ 47 48 bool virtual isDataExpected(const CDate& date) const; 49 CGraphPackage * graphPackage; 50 bool graphEnabled; 48 51 49 52 protected: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/temporal_filter.cpp
r1523 r2143 2 2 #include "functor_type.hpp" 3 3 #include "calendar_util.hpp" 4 #include "workflow_graph.hpp" 4 5 5 6 namespace xios … … 28 29 // , nextOperationDate(initDate + opFreq + this->samplingOffset) 29 30 , isFirstOperation(true) 31 , graphCycleCompleted(true) 30 32 { 31 33 } 32 34 35 void CTemporalFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data) 36 { 37 if(this->graphEnabled ) 38 { 39 if(!data[0]->graphPackage) 40 { 41 data[0]->graphPackage = new CGraphDataPackage; 42 } 43 44 if(graphCycleCompleted) 45 { 46 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 47 CWorkflowGraph::addNode("Temporal filter", 3, false, 0, data[0]); 48 graphCycleCompleted = false; 49 } 50 51 data[0]->graphPackage->currentField = this->graphPackage->inFields[0]; 52 std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end()); 53 54 CWorkflowGraph::addEdge(data[0]->graphPackage->fromFilter, this->graphPackage->filterId, data[0]); 55 data[0]->graphPackage->fromFilter = this->graphPackage->filterId; 56 this->graphPackage->sourceFilterIds.push_back(data[0]->graphPackage->fromFilter); 57 data[0]->graphPackage->currentField = this->graphPackage->inFields[0]; 58 std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end()); 59 } 60 61 } 33 62 CDataPacketPtr CTemporalFilter::apply(std::vector<CDataPacketPtr> data) 34 63 { 64 buildWorkflowGraph(data); 65 35 66 CDataPacketPtr packet; 36 67 … … 43 74 { 44 75 usePacket = (data[0]->date >= nextSamplingDate); 45 // outputResult = (data[0]->date + samplingFreq > nextOperationDate);46 76 outputResult = (data[0]->date > initDate + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth); 47 77 copyLess = (isInstantOperation && usePacket && outputResult); … … 75 105 packet->data.resize(tmpData.numElements()); 76 106 packet->data = tmpData; 107 packet->graphPackage = data[0]->graphPackage; 77 108 } 78 109 else … … 80 111 81 112 isFirstOperation = false; 82 // nextOperationDate = initDate + samplingFreq + nbOperationDates*opFreq - samplingFreq + offsetMonth + offsetAllButMonth;83 113 graphCycleCompleted = true; 114 } 84 115 } 85 116 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/temporal_filter.hpp
r1473 r2143 39 39 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data); 40 40 41 void buildWorkflowGraph(std::vector<CDataPacketPtr> data); 42 41 43 /*! 42 44 * Tests if the filter must auto-trigger. … … 52 54 */ 53 55 bool virtual isDataExpected(const CDate& date) const; 56 bool graphCycleCompleted; 54 57 55 58 private: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/transform_filter.cpp
r2011 r2143 1 1 #include "transform_filter.hpp" 2 2 #include "grid_algorithm.hpp" 3 3 #include "workflow_graph.hpp" 4 4 namespace xios 5 5 { … … 22 22 if (packet->status == CDataPacket::NO_ERROR) 23 23 { 24 buildWorkflowGraph(data, packet, algorithm_); 25 24 26 if (data.size()>1) 25 27 { … … 33 35 } 34 36 35 37 void CTransformFilter::buildWorkflowGraph(std::vector<CDataPacketPtr> data, CDataPacketPtr packet, CGridAlgorithm* algorithm) 38 { 39 if(this->graphEnabled) 40 { 41 this->graphPackage->filterId = CWorkflowGraph::getNodeSize(); 42 43 packet->graphPackage = new CGraphDataPackage; 44 if(data[0]->graphPackage) 45 { 46 packet->graphPackage->fromFilter = data[0]->graphPackage->fromFilter; 47 } 48 packet->graphPackage->toFilter = data[0]->graphPackage->toFilter; 49 packet->graphPackage->current_filter_name = data[0]->graphPackage->current_filter_name; 50 packet->graphPackage->contextId = data[0]->graphPackage->contextId; 51 52 int tmp_from = packet->graphPackage->fromFilter; 53 if(this->graphPackage->show) 54 { 55 packet->graphPackage->currentField = this->graphPackage->inFields[0]; 56 CWorkflowGraph::addNode("Spatial transform filter "+algorithm->getAlgoName(), 4, false, 1, packet); 57 CWorkflowGraph::addEdge(packet->graphPackage->fromFilter, this->graphPackage->filterId, packet); 58 packet->graphPackage->fromFilter = this->graphPackage->filterId; 59 packet->graphPackage->currentField = this->graphPackage->inFields[0]; 60 std::rotate(this->graphPackage->inFields.begin(), this->graphPackage->inFields.begin() + 1, this->graphPackage->inFields.end()); 61 } 62 else 63 { 64 packet->graphPackage->currentField = this->graphPackage->inFields[0]; 65 if(CXios::isClient) CWorkflowGraph::vectorOfNodes_->at(tmp_from).filter_name += algorithm->getAlgoName(); 66 else CWorkflowGraph::vectorOfNodes_srv_->at(tmp_from).filter_name += algorithm->getAlgoName(); 67 68 } 69 } 70 } 36 71 37 72 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/transform_filter.hpp
r2011 r2143 22 22 */ 23 23 CDataPacketPtr virtual apply(std::vector<CDataPacketPtr> data) ; 24 void buildWorkflowGraph(std::vector<CDataPacketPtr> data, CDataPacketPtr packet, CGridAlgorithm* algorithm); 25 24 26 // void apply(const CArray<double, 1>& dataSrc, CArray<double,1>& dataDest); 25 27
Note: See TracChangeset
for help on using the changeset viewer.