Changeset 1934 for XIOS/dev/dev_ym/XIOS_COUPLING/src/filter
- Timestamp:
- 09/16/20 18:34:23 (4 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src/filter
- Files:
-
- 2 added
- 2 deleted
- 5 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.cpp
r1930 r1934 14 14 { 15 15 CContext* context = CContext::getCurrent(); 16 field_=field ; 16 17 grid_= field-> getGrid(); 17 18 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.hpp
r1930 r1934 37 37 38 38 private: 39 CField* field_ ; 39 40 CGrid* grid_; //!< The grid attached to the data the filter can accept 40 41 CDuration freqOp_ ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp
r1930 r1934 6 6 #include "calendar_util.hpp" 7 7 #include "context.hpp" 8 #include "event_client.hpp" 9 #include "timer.hpp" 10 #include "tracer.hpp" 8 11 #include <limits> 9 12 … … 14 17 { 15 18 CContext* context = CContext::getCurrent(); 19 field_ = field ; 16 20 grid_= field->getGrid(); 17 freqOp_ = field->fileIn_->output_freq ; 21 freqOp_ = field->getRelFile()->output_freq ; 22 client_= field->getRelFile()->getContextClient() ; 18 23 lastDateReceived_ = context->getCalendar()->getInitDate(); 19 24 offset_ = field->freq_offset ; … … 39 44 if (!wasEOF) dateEOF_ = lastDateReceived_; 40 45 packet->status = CDataPacket::END_OF_STREAM; 46 info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 47 info(20)<<"lastDateReceived_ "<<lastDateReceived_<< " date "<<packet->date<<" ----> EOF"<<endl; 48 41 49 } 42 50 else 43 51 { 44 grid_->getServerFromClientConnector()->transfer(event, packet->data) ; 52 CContextClient* client = event.getContextServer()->getAssociatedClient() ; 53 grid_->getClientFromServerConnector(client)->transfer(event, packet->data) ; // to avoid to make a search in map for corresponding client connector, 54 55 info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 56 info(20)<<"lastDateReceived_ "<<lastDateReceived_<< " date "<<packet->date<<endl; // make a registration at initialization once 45 57 packet->status = CDataPacket::NO_ERROR; 46 58 } … … 49 61 } 50 62 63 int CClientFromServerSourceFilter::sendReadDataRequest(const CDate& tsDataRequested) 64 { 65 CContext* context = CContext::getCurrent(); 66 lastDataRequestedFromServer_ = tsDataRequested; 67 68 // No need to send the request if we are sure that we are already at EOF 69 if (!isEOF_ || context->getCalendar()->getCurrentDate() <= dateEOF_) 70 { 71 CEventClient event(field_->getType(), CField::EVENT_ID_READ_DATA); 72 if (client_->isServerLeader()) 73 { 74 CMessage msg; 75 msg << field_->getId(); 76 for(auto& rank : client_->getRanksServerLeader()) event.push(rank, 1, msg); 77 client_->sendEvent(event); 78 } 79 else client_->sendEvent(event); 80 } 81 else 82 { 83 CDataPacketPtr packet(new CDataPacket); 84 packet->date = tsDataRequested; 85 packet->timestamp = packet->date ; 86 packet->status = CDataPacket::END_OF_STREAM; 87 onOutputReady(packet); 88 } 89 90 wasDataRequestedFromServer_ = true; 91 92 return !isEOF_; 93 } 94 95 bool CClientFromServerSourceFilter::sendReadDataRequestIfNeeded(void) 96 TRY 97 { 98 const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 99 100 bool dataRequested = false; 101 102 while (currentDate >= lastDataRequestedFromServer_) 103 { 104 info(20) << "currentDate : " << currentDate << endl ; 105 info(20) << "Field : " << field_->getId() << endl ; 106 info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer_ << endl ; 107 info(20) << "freqOp : " << freqOp_ << endl ; 108 info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer_ + freqOp_ << endl ; 109 110 dataRequested |= sendReadDataRequest(lastDataRequestedFromServer_ + freqOp_); 111 } 112 113 return dataRequested; 114 } 115 CATCH 116 117 void CClientFromServerSourceFilter::checkForLateData(void) 118 TRY 119 { 120 CContext* context = CContext::getCurrent(); 121 // Check if data previously requested has been received as expected 122 if (wasDataRequestedFromServer_ && ! isEOF_) 123 { 124 CTimer timer("CClientFromServerSourceFilter::checkForLateDataFromServer"); 125 timer.resume(); 126 traceOff() ; 127 timer.suspend(); 128 129 bool isLate; 130 do 131 { 132 isLate = isDataLate(); 133 if (isLate) 134 { 135 timer.resume(); 136 context->globalEventLoop(); 137 timer.suspend(); 138 } 139 } 140 while (isLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 141 timer.resume(); 142 traceOn() ; 143 timer.suspend() ; 144 145 146 if (isLate) 147 ERROR("void CClientFromServerSourceFilter::checkForLateDataFromServer(void)", 148 << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 149 } 150 } 151 CATCH 152 153 51 154 bool CClientFromServerSourceFilter::isDataLate(void) 52 155 { … … 60 163 61 164 } 165 166 62 167 } // namespace xios -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.hpp
r1930 r1934 6 6 #include "output_pin.hpp" 7 7 #include "event_server.hpp" 8 #include "context_client.hpp" 8 9 #include "calendar_util.hpp" 9 10 … … 38 39 bool isDataLate(void) ; 39 40 bool isEOF() {return isEOF_ ;} 41 int sendReadDataRequest(const CDate& tsDataRequested) ; 42 bool sendReadDataRequestIfNeeded(void) ; 43 void checkForLateData(void) ; 40 44 41 45 private: 46 CField* field_; 42 47 CGrid* grid_; //!< The grid attached to the data the filter can accept 43 48 CDuration freqOp_ ; 44 49 CDuration offset_ ; 45 50 CContextClient* client_ = nullptr ; 46 51 bool wasDataAlreadyReceived_= false ; 47 52 CDate lastDateReceived_ ; 53 bool wasDataRequestedFromServer_ = false ; 54 CDate lastDataRequestedFromServer_ ; 55 48 56 bool isEOF_ = false ; 49 57 CDate dateEOF_ ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_reader_source_filter.cpp
r1930 r1934 30 30 packet->status = CDataPacket::NO_ERROR; 31 31 32 CArray<double,1> data ;33 32 if (!isInitialized_) initialize() ; 34 33 CField::EReadField readState = CField::RF_DATA; 35 36 34 if ( nStepMax_==0 || (nStep_ >= nStepMax_ && !isCyclic_)) readState = CField::RF_EOF; 37 35 38 36 if (CField::RF_EOF != readState) 39 37 { 40 if ( file_->isEmptyZone()) readData(data) ;38 if (!file_->isEmptyZone()) readData(packet->data) ; 41 39 else readState = CField::RF_NODATA; 42 40 } … … 45 43 if (readState == CField::RF_DATA) packet->status = CDataPacket::NO_ERROR; 46 44 else packet->status = CDataPacket::END_OF_STREAM; 47 45 46 info(20)<<"Read data from file : FieldId "<<field_->getId()<<" nStep "<<nStep_<<" date : "<<packet->date<<endl ; 47 48 48 onOutputReady(packet); 49 49 } … … 53 53 CContext* context = CContext::getCurrent(); 54 54 file_->initRead(); 55 if ( file_->isEmptyZone())55 if (!file_->isEmptyZone()) 56 56 { 57 57 file_->checkReadFile(); … … 60 60 } 61 61 MPI_Allreduce(MPI_IN_PLACE, &nStepMax_, 1, MPI_INT, MPI_MAX, context->getIntraComm()); 62 isInitialized_=true; 62 63 } 63 64 … … 65 66 { 66 67 CGridLocalConnector* connector = grid_->getFullToWorkflowConnector() ; 67 CArray<double,1> dataIn(connector->get DstSize()) ;68 CArray<double,1> dataIn(connector->getSrcSize()) ; 68 69 file_->getDataInput()->readFieldData(field_, nStep_%nStepMax_, dataIn); 70 data.resize(connector->getDstSize()) ; 69 71 connector->transfer(dataIn, data) ; 70 72
Note: See TracChangeset
for help on using the changeset viewer.