- Timestamp:
- 09/16/20 18:34:23 (4 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src
- Files:
-
- 2 added
- 2 deleted
- 20 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/config/field_attribute.conf
r1524 r1934 24 24 DECLARE_ATTRIBUTE(StdString, scalar_ref, false) 25 25 DECLARE_ATTRIBUTE(StdString, grid_ref) 26 DECLARE_ATTRIBUTE(StdString, field_ref )26 DECLARE_ATTRIBUTE(StdString, field_ref, false) 27 27 DECLARE_ATTRIBUTE(StdString, grid_path) 28 28 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/distribution/grid_scatterer_connector.hpp
r1918 r1934 68 68 { 69 69 list<CMessage> messages; 70 for(auto ranksData : dataOut)70 for(auto& ranksData : dataOut) 71 71 { 72 72 int rank = ranksData.first ; … … 78 78 } 79 79 client->sendEvent(event) ; 80 } 81 80 } 81 82 void transfer(CContextClient* client, CEventClient& event, const CMessage& messageHeader) 83 { 84 list<CMessage> messages; 85 for(auto& it : nbSenders_) 86 { 87 int rank = it.first ; 88 auto& nbSender = it.second ; 89 90 messages.push_back(CMessage(messageHeader)); 91 event.push(rank, nbSenders_[rank], messages.back()); 92 } 93 client->sendEvent(event) ; 94 } 82 95 }; 83 96 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.cpp
r1930 r1934 14 14 { 15 15 CContext* context = CContext::getCurrent(); 16 field_=field ; 16 17 grid_= field-> getGrid(); 17 18 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_client_source_filter.hpp
r1930 r1934 37 37 38 38 private: 39 CField* field_ ; 39 40 CGrid* grid_; //!< The grid attached to the data the filter can accept 40 41 CDuration freqOp_ ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.cpp
r1930 r1934 6 6 #include "calendar_util.hpp" 7 7 #include "context.hpp" 8 #include "event_client.hpp" 9 #include "timer.hpp" 10 #include "tracer.hpp" 8 11 #include <limits> 9 12 … … 14 17 { 15 18 CContext* context = CContext::getCurrent(); 19 field_ = field ; 16 20 grid_= field->getGrid(); 17 freqOp_ = field->fileIn_->output_freq ; 21 freqOp_ = field->getRelFile()->output_freq ; 22 client_= field->getRelFile()->getContextClient() ; 18 23 lastDateReceived_ = context->getCalendar()->getInitDate(); 19 24 offset_ = field->freq_offset ; … … 39 44 if (!wasEOF) dateEOF_ = lastDateReceived_; 40 45 packet->status = CDataPacket::END_OF_STREAM; 46 info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 47 info(20)<<"lastDateReceived_ "<<lastDateReceived_<< " date "<<packet->date<<" ----> EOF"<<endl; 48 41 49 } 42 50 else 43 51 { 44 grid_->getServerFromClientConnector()->transfer(event, packet->data) ; 52 CContextClient* client = event.getContextServer()->getAssociatedClient() ; 53 grid_->getClientFromServerConnector(client)->transfer(event, packet->data) ; // to avoid to make a search in map for corresponding client connector, 54 55 info(20)<<"Receiv Data from server to client: FieldId : "<<field_->getId()<<endl ; 56 info(20)<<"lastDateReceived_ "<<lastDateReceived_<< " date "<<packet->date<<endl; // make a registration at initialization once 45 57 packet->status = CDataPacket::NO_ERROR; 46 58 } … … 49 61 } 50 62 63 int CClientFromServerSourceFilter::sendReadDataRequest(const CDate& tsDataRequested) 64 { 65 CContext* context = CContext::getCurrent(); 66 lastDataRequestedFromServer_ = tsDataRequested; 67 68 // No need to send the request if we are sure that we are already at EOF 69 if (!isEOF_ || context->getCalendar()->getCurrentDate() <= dateEOF_) 70 { 71 CEventClient event(field_->getType(), CField::EVENT_ID_READ_DATA); 72 if (client_->isServerLeader()) 73 { 74 CMessage msg; 75 msg << field_->getId(); 76 for(auto& rank : client_->getRanksServerLeader()) event.push(rank, 1, msg); 77 client_->sendEvent(event); 78 } 79 else client_->sendEvent(event); 80 } 81 else 82 { 83 CDataPacketPtr packet(new CDataPacket); 84 packet->date = tsDataRequested; 85 packet->timestamp = packet->date ; 86 packet->status = CDataPacket::END_OF_STREAM; 87 onOutputReady(packet); 88 } 89 90 wasDataRequestedFromServer_ = true; 91 92 return !isEOF_; 93 } 94 95 bool CClientFromServerSourceFilter::sendReadDataRequestIfNeeded(void) 96 TRY 97 { 98 const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 99 100 bool dataRequested = false; 101 102 while (currentDate >= lastDataRequestedFromServer_) 103 { 104 info(20) << "currentDate : " << currentDate << endl ; 105 info(20) << "Field : " << field_->getId() << endl ; 106 info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer_ << endl ; 107 info(20) << "freqOp : " << freqOp_ << endl ; 108 info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer_ + freqOp_ << endl ; 109 110 dataRequested |= sendReadDataRequest(lastDataRequestedFromServer_ + freqOp_); 111 } 112 113 return dataRequested; 114 } 115 CATCH 116 117 void CClientFromServerSourceFilter::checkForLateData(void) 118 TRY 119 { 120 CContext* context = CContext::getCurrent(); 121 // Check if data previously requested has been received as expected 122 if (wasDataRequestedFromServer_ && ! isEOF_) 123 { 124 CTimer timer("CClientFromServerSourceFilter::checkForLateDataFromServer"); 125 timer.resume(); 126 traceOff() ; 127 timer.suspend(); 128 129 bool isLate; 130 do 131 { 132 isLate = isDataLate(); 133 if (isLate) 134 { 135 timer.resume(); 136 context->globalEventLoop(); 137 timer.suspend(); 138 } 139 } 140 while (isLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 141 timer.resume(); 142 traceOn() ; 143 timer.suspend() ; 144 145 146 if (isLate) 147 ERROR("void CClientFromServerSourceFilter::checkForLateDataFromServer(void)", 148 << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 149 } 150 } 151 CATCH 152 153 51 154 bool CClientFromServerSourceFilter::isDataLate(void) 52 155 { … … 60 163 61 164 } 165 166 62 167 } // namespace xios -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/client_from_server_source_filter.hpp
r1930 r1934 6 6 #include "output_pin.hpp" 7 7 #include "event_server.hpp" 8 #include "context_client.hpp" 8 9 #include "calendar_util.hpp" 9 10 … … 38 39 bool isDataLate(void) ; 39 40 bool isEOF() {return isEOF_ ;} 41 int sendReadDataRequest(const CDate& tsDataRequested) ; 42 bool sendReadDataRequestIfNeeded(void) ; 43 void checkForLateData(void) ; 40 44 41 45 private: 46 CField* field_; 42 47 CGrid* grid_; //!< The grid attached to the data the filter can accept 43 48 CDuration freqOp_ ; 44 49 CDuration offset_ ; 45 50 CContextClient* client_ = nullptr ; 46 51 bool wasDataAlreadyReceived_= false ; 47 52 CDate lastDateReceived_ ; 53 bool wasDataRequestedFromServer_ = false ; 54 CDate lastDataRequestedFromServer_ ; 55 48 56 bool isEOF_ = false ; 49 57 CDate dateEOF_ ; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/filter/file_reader_source_filter.cpp
r1930 r1934 30 30 packet->status = CDataPacket::NO_ERROR; 31 31 32 CArray<double,1> data ;33 32 if (!isInitialized_) initialize() ; 34 33 CField::EReadField readState = CField::RF_DATA; 35 36 34 if ( nStepMax_==0 || (nStep_ >= nStepMax_ && !isCyclic_)) readState = CField::RF_EOF; 37 35 38 36 if (CField::RF_EOF != readState) 39 37 { 40 if ( file_->isEmptyZone()) readData(data) ;38 if (!file_->isEmptyZone()) readData(packet->data) ; 41 39 else readState = CField::RF_NODATA; 42 40 } … … 45 43 if (readState == CField::RF_DATA) packet->status = CDataPacket::NO_ERROR; 46 44 else packet->status = CDataPacket::END_OF_STREAM; 47 45 46 info(20)<<"Read data from file : FieldId "<<field_->getId()<<" nStep "<<nStep_<<" date : "<<packet->date<<endl ; 47 48 48 onOutputReady(packet); 49 49 } … … 53 53 CContext* context = CContext::getCurrent(); 54 54 file_->initRead(); 55 if ( file_->isEmptyZone())55 if (!file_->isEmptyZone()) 56 56 { 57 57 file_->checkReadFile(); … … 60 60 } 61 61 MPI_Allreduce(MPI_IN_PLACE, &nStepMax_, 1, MPI_INT, MPI_MAX, context->getIntraComm()); 62 isInitialized_=true; 62 63 } 63 64 … … 65 66 { 66 67 CGridLocalConnector* connector = grid_->getFullToWorkflowConnector() ; 67 CArray<double,1> dataIn(connector->get DstSize()) ;68 CArray<double,1> dataIn(connector->getSrcSize()) ; 68 69 file_->getDataInput()->readFieldData(field_, nStep_%nStepMax_, dataIn); 70 data.resize(connector->getDstSize()) ; 69 71 connector->transfer(dataIn, data) ; 70 72 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.cpp
r1930 r1934 1706 1706 clientToServerConnector_[client]->transfer(maskIn,client,event3,message3) ; 1707 1707 1708 clientFromServerConnector_[client] = new CGathererConnector(clientToServerElement.getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)); 1709 clientFromServerConnector_[client]->computeConnector() ; 1708 1710 1709 1711 … … 1770 1772 serverFromClientConnector_ = new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)) ; 1771 1773 serverFromClientConnector_->computeConnector() ; 1774 1775 serverToClientConnector_ = new CScattererConnector(localElement_->getView(CElementView::WORKFLOW), elementFrom_->getView(CElementView::FULL), 1776 context->getIntraComm()) ; 1777 serverToClientConnector_->computeConnector() ; 1778 1772 1779 } 1773 1780 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.hpp
r1930 r1934 277 277 CGathererConnector* getServerFromClientConnector(void) { return serverFromClientConnector_ ;} 278 278 279 279 private: 280 CScattererConnector* serverToClientConnector_ = nullptr ; 281 public: 282 CScattererConnector* getServerToClientConnector(void) { return serverToClientConnector_ ;} 283 284 private: 285 map<CContextClient*,CGathererConnector*> clientFromServerConnector_ ; 286 public: 287 CGathererConnector* getClientFromServerConnector(CContextClient* client) { return clientFromServerConnector_[client] ;} 280 288 281 289 DECLARE_REF_FUNC(Axis,axis) -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r1883 r1934 1246 1246 for(auto field : fileInField) 1247 1247 { 1248 field->connectToServerInput(garbageCollector) ; // connect t Fhe field to server filter1249 field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context1248 field->connectToServerInput(garbageCollector) ; // connect the field to server filter 1249 // obsolete field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context 1250 1250 field->sendFieldToInputFileServer() ; 1251 fileInFields_.push_back(field) ; 1251 1252 } 1252 1253 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/domain.cpp
r1930 r1934 2354 2354 clientToServerConnector_[client]->transfer(maskIn,client,event3,message3) ; 2355 2355 2356 clientFromServerConnector_[client] = new CGathererConnector(clientToServerElement.getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)); 2357 clientFromServerConnector_[client]->computeConnector() ; 2358 2356 2359 } 2357 2360 CATCH … … 2421 2424 serverFromClientConnector_ = new CGathererConnector(elementFrom_->getView(CElementView::FULL), localElement_->getView(CElementView::WORKFLOW)) ; 2422 2425 serverFromClientConnector_->computeConnector() ; 2426 2427 serverToClientConnector_ = new CScattererConnector(localElement_->getView(CElementView::WORKFLOW), elementFrom_->getView(CElementView::FULL), 2428 context->getIntraComm()) ; 2429 serverToClientConnector_->computeConnector() ; 2430 2423 2431 } 2424 2432 } -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/domain.hpp
r1930 r1934 349 349 public: 350 350 CScattererConnector* getClientToServerConnector(CContextClient* client) { return clientToServerConnector_[client] ;} 351 351 352 private: 352 353 CGathererConnector* gathererConnector_ ; … … 354 355 CDistributedElement* elementFrom_ ; 355 356 public: 356 CGathererConnector* getServerFromClientConnector(void) { return serverFromClientConnector_ ;} 357 357 CGathererConnector* getServerFromClientConnector(void) { return serverFromClientConnector_ ;} 358 359 private: 360 CScattererConnector* serverToClientConnector_ = nullptr ; 361 public: 362 CScattererConnector* getServerToClientConnector(void) { return serverToClientConnector_ ;} 363 364 private: 365 map<CContextClient*,CGathererConnector*> clientFromServerConnector_ ; 366 public: 367 CGathererConnector* getClientFromServerConnector(CContextClient* client) { return clientFromServerConnector_[client] ;} 358 368 359 369 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
r1930 r1934 150 150 CATCH 151 151 152 /* obsolete old interface153 void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client)154 TRY155 {156 CTimer::get("Field : send data").resume();157 int receiverSize = client->serverSize;158 159 CEventClient event(getType(), EVENT_ID_UPDATE_DATA);160 161 map<int, CArray<int,1> >::iterator it;162 list<CMessage> list_msg;163 list<CArray<double,1> > list_data;164 165 if (!grid_->doGridHaveDataDistributed(client))166 {167 if (client->isServerLeader())168 {169 for (it = grid_->storeIndex_toSrv_[client].begin(); it != grid_->storeIndex_toSrv_[client].end(); it++)170 {171 int rank = it->first;172 CArray<int,1>& index = it->second;173 174 list_msg.push_back(CMessage());175 list_data.push_back(CArray<double,1>(index.numElements()));176 177 CArray<double,1>& data_tmp = list_data.back();178 for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n));179 180 list_msg.back() << getId() << timeStamp << data_tmp;181 event.push(rank, 1, list_msg.back());182 }183 client->sendEvent(event);184 }185 else client->sendEvent(event);186 }187 else188 {189 for (it = grid_->storeIndex_toSrv_[client].begin(); it != grid_->storeIndex_toSrv_[client].end(); it++)190 {191 int rank = it->first;192 CArray<int,1>& index = it->second;193 194 list_msg.push_back(CMessage());195 list_data.push_back(CArray<double,1>(index.numElements()));196 197 CArray<double,1>& data_tmp = list_data.back();198 for (int n = 0; n < data_tmp.numElements(); n++) data_tmp(n) = data(index(n));199 200 list_msg.back() << getId() << timeStamp << data_tmp;201 event.push(rank, grid_->nbSenders_[receiverSize][rank], list_msg.back());202 }203 client->sendEvent(event);204 }205 206 CTimer::get("Field : send data").suspend();207 }208 CATCH_DUMP_ATTR209 */210 211 152 void CField::sendUpdateData(Time timeStamp, const CArray<double,1>& data, CContextClient* client) 212 153 TRY … … 222 163 CATCH_DUMP_ATTR 223 164 224 /* old version obsolete 225 void CField::recvUpdateData(CEventServer& event) 226 TRY 227 { 228 std::map<int,CBufferIn*> rankBuffers; 229 230 list<CEventServer::SSubEvent>::iterator it; 231 string fieldId; 232 CTimer::get("Field : recv data").resume(); 233 for (it = event.subEvents.begin(); it != event.subEvents.end(); ++it) 234 { 235 int rank = it->rank; 236 CBufferIn* buffer = it->buffer; 237 *buffer >> fieldId; 238 rankBuffers[rank] = buffer; 239 } 240 get(fieldId)->recvUpdateData(rankBuffers); 241 CTimer::get("Field : recv data").suspend(); 242 } 243 CATCH 244 */ 245 165 246 166 void CField::recvUpdateData(CEventServer& event) 247 167 TRY … … 260 180 } 261 181 CATCH 262 /* 263 void CField::recvUpdateDataFromClient(CEventServer& event) 264 TRY 265 { 266 Time timeStamp ; 267 for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> timeStamp ; 268 269 CArray<double,1> recvData ; 270 getGrid()->getServerFromClientConnector()->transfer(event,recvData) ; 271 this->setData(recvData); 272 } 273 CATCH 274 */ 275 276 /* 277 void CField::recvUpdateDataFromCoupler(CEventServer& event) 278 TRY 279 { 280 CContext* context = CContext::getCurrent(); 281 Time timeStamp ; 282 if (wasDataAlreadyReceivedFromServer) 283 { 284 lastDataReceivedFromServer = lastDataReceivedFromServer + freq_op; 285 } 286 else 287 { 288 // unlikely to input from file server where data are received at ts=0 289 // for coupling, it would be after the first freq_op, because for now we don't have 290 // restart mecanism to send the value at ts=0. It must be changed in future 291 lastDataReceivedFromServer = context->getCalendar()->getInitDate(); 292 wasDataAlreadyReceivedFromServer = true; 293 } 294 295 CArray<double,1> recvData ; 296 getGrid()->getServerFromClientConnector()->transfer(event,recvData) ; 297 clientSourceFilter->streamData(lastDataReceivedFromServer, recvData); 298 299 } 300 CATCH_DUMP_ATTR 301 */ 302 303 304 305 306 /* old interface to be removed.... */ 307 void CField::recvUpdateDataFromClient(std::map<int,CBufferIn*>& rankBuffers) 308 TRY 309 { 310 // ym to remove latter 311 /* 312 CContext* context = CContext::getCurrent(); 313 Time timeStamp ; 314 size_t sizeData = 0; 315 if (0 == recvDataSrv.numElements()) 316 { 317 CArray<int,1>& storeClient = grid_->getStoreIndex_client(); 318 319 // Gather all data from different clients 320 recvDataSrv.resize(storeClient.numElements()); 321 recvFoperationSrv = std::shared_ptr<func::CFunctor>(new func::CInstant(recvDataSrv)); 322 } 323 324 CArray<double,1> recv_data_tmp(recvDataSrv.numElements()); 325 const CDate& currDate = context->getCalendar()->getCurrentDate(); 326 CDuration offsetAllButMonth (freq_offset.getValue().year, 0 , freq_offset.getValue().day, 327 freq_offset.getValue().hour, freq_offset.getValue().minute, 328 freq_offset.getValue().second, freq_offset.getValue().timestep); 329 const CDate opeDate = (last_operation_srv - offsetAllButMonth + context->getCalendar()->getTimeStep()) 330 + freq_op + freq_operation_srv - freq_op - context->getCalendar()->getTimeStep() + offsetAllButMonth; 331 332 if (opeDate <= currDate) 333 { 334 335 auto& outLocalIndexStoreOnClient = grid_-> getOutLocalIndexStoreOnClient() ; 336 for (auto it = outLocalIndexStoreOnClient.begin(); it != outLocalIndexStoreOnClient.end(); ++it) 337 { 338 CArray<double,1> tmp; 339 CArray<size_t,1>& indexTmp = it->second; 340 *(rankBuffers[it->first]) >> timeStamp >> tmp; 341 for (int idx = 0; idx < indexTmp.numElements(); ++idx) recv_data_tmp(indexTmp(idx)) = tmp(idx); 342 } 343 } 344 */ 345 Time timeStamp ; 346 CArray<int,1>& storeClient = grid_->getStoreIndex_client(); // replace it with local size 347 CArray<double,1> recv_data_tmp(storeClient.numElements()); 348 auto& outLocalIndexStoreOnClient = grid_-> getOutLocalIndexStoreOnClient() ; 349 for (auto it = outLocalIndexStoreOnClient.begin(); it != outLocalIndexStoreOnClient.end(); ++it) 350 { 351 CArray<double,1> tmp; 352 CArray<size_t,1>& indexTmp = it->second; 353 *(rankBuffers[it->first]) >> timeStamp >> tmp; 354 for (int idx = 0; idx < indexTmp.numElements(); ++idx) recv_data_tmp(indexTmp(idx)) = tmp(idx); 355 } 356 357 this->setData(recv_data_tmp); 358 /* 359 // delete incomming flux for server only 360 recvFoperationSrv.reset() ; 361 recvDataSrv.reset() ; 362 */ 363 } 364 CATCH_DUMP_ATTR 365 366 /* ym : old interface : to be removed... 367 void CField::writeUpdateData(const CArray<double,1>& data) 368 TRY 369 { 370 CContext* context = CContext::getCurrent(); 371 372 const CDate& currDate = context->getCalendar()->getCurrentDate(); 373 CDuration offsetAllButMonth (freq_offset.getValue().year, 0 , freq_offset.getValue().day, 374 freq_offset.getValue().hour, freq_offset.getValue().minute, 375 freq_offset.getValue().second, freq_offset.getValue().timestep); 376 const CDate opeDate = (last_operation_srv - offsetAllButMonth + context->getCalendar()->getTimeStep()) 377 + freq_op + freq_operation_srv - freq_op - context->getCalendar()->getTimeStep() + offsetAllButMonth; 378 const CDate writeDate = last_Write_srv + freq_write_srv; 379 380 if (opeDate <= currDate) 381 { 382 (*recvFoperationSrv)(data); 383 last_operation_srv = currDate; 384 } 385 386 if (writeDate < (currDate + freq_operation_srv)) 387 { 388 recvFoperationSrv->final(); 389 last_Write_srv = writeDate; 390 grid_->computeWrittenIndex(); 391 writeField(); 392 lastlast_Write_srv = last_Write_srv; 393 } 394 } 395 CATCH_DUMP_ATTR 396 */ 397 182 183 398 184 void CField::writeUpdateData(const CArray<double,1>& data) 399 185 TRY … … 434 220 \param [in] tsDataRequested timestamp when the call is made 435 221 */ 436 bool CField::sendReadDataRequest(const CDate& tsDataRequested, CContextClient* client) 437 TRY 438 { 439 CContext* context = CContext::getCurrent(); 440 441 lastDataRequestedFromServer = tsDataRequested; 442 443 // No need to send the request if we are sure that we are already at EOF 444 if (!isEOF || context->getCalendar()->getCurrentDate() <= dateEOF) 445 { 446 CEventClient event(getType(), EVENT_ID_READ_DATA); 447 if (client->isServerLeader()) 448 { 449 CMessage msg; 450 msg << getId(); 451 const std::list<int>& ranks = client->getRanksServerLeader(); 452 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) 453 event.push(*itRank, 1, msg); 454 client->sendEvent(event); 455 } 456 else client->sendEvent(event); 457 } 458 else serverSourceFilter->signalEndOfStream(tsDataRequested); 459 460 wasDataRequestedFromServer = true; 461 462 return !isEOF; 463 } 464 CATCH_DUMP_ATTR 465 222 bool CField::sendReadDataRequest(const CDate& tsDataRequested) 223 TRY 224 { 225 return clientFromServerSourceFilter_->sendReadDataRequest(tsDataRequested) ; 226 } 227 CATCH_DUMP_ATTR 228 229 466 230 /*! 467 231 Send request new data read from file if need be, that is the current data is out-of-date. … … 471 235 TRY 472 236 { 473 const CDate& currentDate = CContext::getCurrent()->getCalendar()->getCurrentDate(); 474 475 bool dataRequested = false; 476 477 while (currentDate >= lastDataRequestedFromServer) 478 { 479 info(20) << "currentDate : " << currentDate << endl ; 480 info(20) << "lastDataRequestedFromServer : " << lastDataRequestedFromServer << endl ; 481 info(20) << "fileIn_->output_freq.getValue() : " << fileIn_->output_freq.getValue() << endl ; 482 info(20) << "lastDataRequestedFromServer + fileIn_->output_freq.getValue() : " << lastDataRequestedFromServer + fileIn_->output_freq << endl ; 483 484 dataRequested |= sendReadDataRequest(lastDataRequestedFromServer + fileIn_->output_freq, fileIn_->getContextClient()); 485 } 486 487 return dataRequested; 488 } 489 CATCH_DUMP_ATTR 237 return clientFromServerSourceFilter_->sendReadDataRequestIfNeeded() ; 238 } 239 CATCH_DUMP_ATTR 240 490 241 491 242 void CField::recvReadDataRequest(CEventServer& event) … … 512 263 CATCH_DUMP_ATTR 513 264 514 /* old interface -> to remove515 void CField::recvReadDataRequest(CContextServer* server)516 TRY517 {518 519 CArray<double,1> data ;520 EReadField hasData = readField(data);521 CDate date = CContext::getCurrent()->getCalendar()->getCurrentDate();522 if (hasData == RF_DATA) fileServerReaderFilter_->streamData(date,data) ;523 else fileServerReaderFilter_->signalEndOfStream(date) ;524 525 }526 CATCH_DUMP_ATTR527 */528 265 529 266 void CField::sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) … … 583 320 CATCH_DUMP_ATTR 584 321 585 /*! 586 Read field from a file. 587 A field is read with the distribution of data on the server side 588 \return State of field can be read from a file 589 */ 590 // obsolete to remove 591 /* 592 CField::EReadField CField::readField(CArray<double,1>& data) 593 TRY 594 { 595 CContext* context = CContext::getCurrent(); 596 grid_->computeWrittenIndex(); 597 getRelFile()->initRead(); 598 EReadField readState = RF_DATA; 599 600 if (!getRelFile()->isEmptyZone()) 601 { 602 if (grid_->doGridHaveDataToWrite() || getRelFile()->type == CFile::type_attr::one_file) 603 { 604 CArray<int,1>& storeClient = grid_->getStoreIndex_client(); 605 data.resize(storeClient.numElements()); 606 607 getRelFile()->checkReadFile(); 608 609 if (!nstepMax) 610 { 611 nstepMax = getRelFile()->getDataInput()->getFieldNbRecords(CField::get(this)); 612 } 613 614 this->incrementNStep(); 615 616 if (getNStep() > nstepMax && (getRelFile()->cyclic.isEmpty() || !getRelFile()->cyclic) ) 617 readState = RF_EOF; 618 619 if (RF_EOF != readState) 620 getRelFile()->getDataInput()->readFieldData(CField::get(this),data); 621 } 622 } 623 else 624 { 625 this->incrementNStep(); 626 if (getNStep() > nstepMax && (getRelFile()->cyclic.isEmpty() || !getRelFile()->cyclic) ) 627 readState = RF_EOF; 628 else 629 readState = RF_NODATA; 630 631 if (!nstepMaxRead) // This can be a bug if we try to read field from zero time record 632 readState = RF_NODATA; 633 } 634 635 if (!nstepMaxRead) 636 { 637 MPI_Allreduce(MPI_IN_PLACE, &nstepMax, 1, MPI_INT, MPI_MAX, context->intraComm_); 638 nstepMaxRead = true; 639 } 640 641 return readState; 642 } 643 CATCH_DUMP_ATTR 644 */ 645 322 646 323 /* 647 324 Receive read data from server. … … 655 332 string fieldId; 656 333 for (auto& subEvent : event.subEvents) (*subEvent.buffer) >> fieldId ; 657 get(fieldId)->rec vReadDataReady(event);334 get(fieldId)->receiveReadDataReady(event); 658 335 } 659 336 CATCH 660 337 338 void CField::receiveReadDataReady(CEventServer& event) 339 TRY 340 { 341 clientFromServerSourceFilter_->streamData(event) ; 342 } 343 CATCH_DUMP_ATTR 661 344 662 345 /* old interface to be removed ..*/ … … 742 425 } 743 426 CATCH_DUMP_ATTR 744 745 746 747 void CField::receiveReadDataReady(CEventServer& event)748 TRY749 {750 clientFromServerSourceFilter_->streamData(event) ;751 }752 CATCH_DUMP_ATTR753 754 427 755 428 … … 786 459 CATCH_DUMP_ATTR 787 460 788 789 461 void CField::checkForLateDataFromServer(void) 790 462 TRY 791 463 { 792 CContext* context = CContext::getCurrent(); 793 // Check if data previously requested has been received as expected 794 if (wasDataRequestedFromServer && !clientFromServerSourceFilter_->isEOF()) 795 { 796 CTimer timer("CField::checkForLateDataFromServer"); 797 timer.resume(); 798 traceOff() ; 799 timer.suspend(); 800 801 bool isDataLate; 802 do 803 { 804 isDataLate=clientFromServerSourceFilter_->isDataLate(); 805 if (isDataLate) 806 { 807 timer.resume(); 808 809 //ym context->checkBuffersAndListen(); 810 //ym context->eventLoop(); 811 context->globalEventLoop(); 812 813 timer.suspend(); 814 } 815 } 816 while (isDataLate && timer.getCumulatedTime() < CXios::recvFieldTimeout); 817 timer.resume(); 818 traceOn() ; 819 timer.suspend() ; 820 821 822 if (isDataLate) 823 ERROR("void CField::checkForLateDataFromServer(void)", 824 << "Late data at timestep = " << context->getCalendar()->getCurrentDate()); 825 } 826 } 827 CATCH_DUMP_ATTR 828 464 clientFromServerSourceFilter_->checkForLateData() ; 465 } 466 CATCH_DUMP_ATTR 467 468 829 469 void CField::triggerLateField(void) 830 470 TRY … … 1508 1148 void CField::connectToServerInput(CGarbageCollector& gc) 1509 1149 { 1510 const bool detectMissingValues = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true);1511 const double defaultValue = detectMissingValues ? default_value : (!default_value.isEmpty() ? default_value : 0.0);1512 1513 1150 checkTimeAttributes(); 1514 serverSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, true, false, freq_offset, true, 1515 detectMissingValues, defaultValue)); 1516 //serverSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false)); 1517 serverSourceFilter -> connectOutput(inputFilter,0) ; 1151 clientFromServerSourceFilter_ = std::shared_ptr<CClientFromServerSourceFilter>(new CClientFromServerSourceFilter(gc,this)) ; 1152 clientFromServerSourceFilter_ -> connectOutput(inputFilter,0) ; 1518 1153 } 1519 1154 … … 1561 1196 { 1562 1197 fileReaderSourceFilter_ = std::shared_ptr<CFileReaderSourceFilter>(new CFileReaderSourceFilter(gc, this)); 1563 instantDataFilter->connectOutput(inputFilter, 0);1198 fileReaderSourceFilter_->connectOutput(inputFilter, 0); 1564 1199 } 1565 1200 … … 1578 1213 void CField::connectToServerToClient(CGarbageCollector& gc) 1579 1214 { 1580 serverToClient Filter_ = std::shared_ptr<CServerToClientFilter>(new CServerToClientFilter(gc, this, client));1581 instantDataFilter->connectOutput(serverToClient Filter_, 0);1215 serverToClientStoreFilter_ = std::shared_ptr<CServerToClientStoreFilter>(new CServerToClientStoreFilter(gc, this, client)); 1216 instantDataFilter->connectOutput(serverToClientStoreFilter_, 0); 1582 1217 } 1583 1218 … … 2290 1925 } 2291 1926 1927 void CField::sendFieldToInputFileServer(void) 1928 { 1929 CContext::getCurrent()->sendContextToFileServer(client); 1930 getRelFile()->sendFileToFileServer(client); 1931 grid_->sendGridToFileServer(client); 1932 read_access=true ; // not the best solution, but on server side, the field must be a starting point of the workflow 1933 // must be replace by a better solution when implementing filters for reading and send to client 1934 // on server side 1935 this->sendAllAttributesToServer(client); 1936 this->sendAddAllVariables(client); 1937 } 1938 2292 1939 void CField::sendFieldToCouplerOut(void) 2293 1940 { … … 2341 1988 2342 1989 2343 2344 2345 2346 void CField::sendFieldToInputFileServer(void)2347 {2348 CContext::getCurrent()->sendContextToFileServer(client);2349 getRelFile()->sendFileToFileServer(client);2350 grid_->sendGridToFileServer(client);2351 read_access=true ; // not the best solution, but on server side, the field must be a starting point of the workflow2352 // must be replace by a better solution when implementing filters for reading and send to client2353 // on server side2354 this->sendAllAttributesToServer(client);2355 this->sendAddAllVariables(client);2356 }2357 2358 1990 void CField::sendAddAllVariables(CContextClient* client) 2359 1991 TRY -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.hpp
r1930 r1934 23 23 #include "client_from_server_source_filter.hpp" 24 24 #include "client_to_model_store_filter.hpp" 25 #include "server_to_client_store_filter.hpp" 25 26 26 27 … … 203 204 204 205 void writeField(const CArray<double,1>& data); 205 bool sendReadDataRequest(const CDate& tsDataRequested , CContextClient* client);206 bool sendReadDataRequest(const CDate& tsDataRequested); 206 207 bool sendReadDataRequestIfNeeded(void); 207 208 static void recvReadDataRequest(CEventServer& event); … … 434 435 435 436 //! The terminal filter which send data from server to client 436 std::shared_ptr<CServerToClient Filter> serverToClientFilter_;437 std::shared_ptr<CServerToClientStoreFilter> serverToClientStoreFilter_; 437 438 438 439 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/file.cpp
r1872 r1934 1010 1010 int size = this->enabledFields.size(); 1011 1011 for (int i = 0; i < size; ++i) 1012 this->enabledFields[i]->sendReadDataRequest(CContext::getCurrent()->getCalendar()->getCurrentDate() , getContextClient());1012 this->enabledFields[i]->sendReadDataRequest(CContext::getCurrent()->getCalendar()->getCurrentDate()); 1013 1013 } 1014 1014 CATCH_DUMP_ATTR -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/grid.cpp
r1930 r1934 151 151 for (int i = 0; i < size; ++i) 152 152 { 153 if (i < domains.size()) { 153 if (i < domains.size()) 154 { 154 155 grid->axis_domain_order(i) = 2; 155 156 grid->order_.push_back(2) ; 156 157 } 157 else if ((scalars.size() < (size-nb)) < size) { 158 else if ((scalars.size() < (size-nb)) < size) 159 { 158 160 grid->axis_domain_order(i) = 1; 161 grid->order_.push_back(1) ; 159 162 } 160 163 else 164 { 161 165 grid->axis_domain_order(i) = 0; 166 grid->order_.push_back(0) ; 167 } 162 168 ++nb; 163 169 } … … 167 173 grid->axis_domain_order.resize(axisDomainOrder.numElements()); 168 174 grid->axis_domain_order = axisDomainOrder; 175 grid->order_.clear() ; 176 for(int i=0; i<axisDomainOrder.numElements();i++) grid->order_.push_back(axisDomainOrder(i)) ; 177 169 178 } 170 179 … … 2663 2672 2664 2673 vector<CScattererConnector*> clientToServerConnectors ; 2674 vector<CGathererConnector*> clientFromServerConnectors ; 2665 2675 for(int i=0 ; i<elements.size() ; i++) 2666 2676 { … … 2671 2681 domain->distributeToServer(client, gridRemoteConnector.getDistributedGlobalIndex(i)) ; 2672 2682 clientToServerConnectors.push_back(domain->getClientToServerConnector(client)) ; 2683 clientFromServerConnectors.push_back(domain->getClientFromServerConnector(client)) ; 2673 2684 } 2674 2685 else if (elements[i].type==TYPE_AXIS) … … 2678 2689 axis->distributeToServer(client, gridRemoteConnector.getDistributedGlobalIndex(i)) ; 2679 2690 clientToServerConnectors.push_back(axis->getClientToServerConnector(client)) ; 2691 clientFromServerConnectors.push_back(axis->getClientFromServerConnector(client)) ; 2692 2680 2693 } 2681 2694 else if (elements[i].type==TYPE_SCALAR) … … 2685 2698 scalar->distributeToServer(client, gridRemoteConnector.getDistributedGlobalIndex(i)) ; 2686 2699 clientToServerConnectors.push_back(scalar->getClientToServerConnector(client)) ; 2700 clientFromServerConnectors.push_back(scalar->getClientFromServerConnector(client)) ; 2687 2701 } 2688 2702 } … … 2690 2704 // compute the grid clientToServerConnector to send flux from client to servers 2691 2705 clientToServerConnector_[client] = new CGridScattererConnector(clientToServerConnectors) ; 2706 clientFromServerConnector_[client] = new CGridGathererConnector(clientFromServerConnectors) ; 2707 2692 2708 2693 2709 } … … 3309 3325 } 3310 3326 3327 void CGrid::computeServerToClientConnector(void) 3328 { 3329 vector<CScattererConnector*> connectors ; 3330 for(auto& element : getElements()) 3331 { 3332 if (element.type==TYPE_DOMAIN) connectors.push_back(element.domain->getServerToClientConnector()) ; 3333 else if (element.type==TYPE_AXIS) connectors.push_back(element.axis->getServerToClientConnector()) ; 3334 else if (element.type==TYPE_SCALAR) connectors.push_back(element.scalar->getServerToClientConnector()) ; 3335 } 3336 serverToClientConnector_ = new CGridScattererConnector(connectors) ; 3337 } 3338 3311 3339 void CGrid::computeClientFromClientConnector(void) 3312 3340 { … … 3320 3348 clientFromClientConnector_ = new CGridGathererConnector(connectors) ; 3321 3349 } 3350 3351 3322 3352 } // namespace xios -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/grid.hpp
r1930 r1934 548 548 void distributeGridToFileServer(CContextClient* client); 549 549 550 551 private: 552 CGridLocalConnector* workflowToFullConnector_ = nullptr; 553 public: 554 void computeWorkflowToFullConnector(void) ; 555 CGridLocalConnector* getWorkflowToFullConnector(void) { if (workflowToFullConnector_==nullptr) computeWorkflowToFullConnector() ; return workflowToFullConnector_;} 556 557 private: 558 CGridLocalConnector* fullToWorkflowConnector_ = nullptr; 559 public: 560 void computeFullToWorkflowConnector(void) ; 561 CGridLocalConnector* getFullToWorkflowConnector(void) { if (fullToWorkflowConnector_==nullptr) computeFullToWorkflowConnector() ; return fullToWorkflowConnector_;} 562 563 564 565 private: 566 CGridGathererConnector* clientFromClientConnector_ = nullptr ; 567 public: 568 CGridGathererConnector* getClientFromClientConnector(void) { if (clientFromClientConnector_==nullptr) computeClientFromClientConnector() ; return clientFromClientConnector_;} 569 void computeClientFromClientConnector(void) ; 570 571 private: 572 map<CContextClient*, CGridScattererConnector*> clientToClientConnector_ ; 573 public: 574 CGridScattererConnector* getClientToClientConnector(CContextClient* client) { return clientToClientConnector_[client] ;} // make some test to see if connector exits for the given client 575 576 577 private: 578 map<CContextClient*,CGridGathererConnector*> clientFromServerConnector_ ; 579 public: 580 CGridGathererConnector* getClientFromServerConnector(CContextClient* client) { return clientFromServerConnector_[client];} 581 void computeClientFromServerConnector(void) ; 582 583 private: 584 CGridScattererConnector* serverToClientConnector_=nullptr ; 585 public: 586 CGridScattererConnector* getServerToClientConnector(void) { if (serverToClientConnector_==nullptr) computeServerToClientConnector() ; return serverToClientConnector_;} 587 void computeServerToClientConnector(void) ; 550 588 private: 551 589 map<CContextClient*, CGridScattererConnector*> clientToServerConnector_ ; … … 558 596 CGridGathererConnector* getServerFromClientConnector(void) { if (serverFromClientConnector_==nullptr) computeServerFromClientConnector() ; return serverFromClientConnector_;} 559 597 void computeServerFromClientConnector(void) ; 560 561 private:562 CGridLocalConnector* workflowToFullConnector_ = nullptr;563 public:564 void computeWorkflowToFullConnector(void) ;565 CGridLocalConnector* getWorkflowToFullConnector(void) { if (workflowToFullConnector_==nullptr) computeWorkflowToFullConnector() ; return workflowToFullConnector_;}566 567 private:568 CGridLocalConnector* fullToWorkflowConnector_ = nullptr;569 public:570 void computeFullToWorkflowConnector(void) ;571 CGridLocalConnector* getFullToWorkflowConnector(void) { if (fullToWorkflowConnector_==nullptr) computeFullToWorkflowConnector() ; return fullToWorkflowConnector_;}572 573 private:574 CGridGathererConnector* clientFromClientConnector_ = nullptr ;575 public:576 CGridGathererConnector* getClientFromClientConnector(void) { if (clientFromClientConnector_==nullptr) computeClientFromClientConnector() ; return clientFromClientConnector_;}577 void computeClientFromClientConnector(void) ;578 579 private:580 map<CContextClient*, CGridScattererConnector*> clientToClientConnector_ ;581 public:582 CGridScattererConnector* getClientToClientConnector(CContextClient* client) { return clientToClientConnector_[client] ;} // make some test to see if connector exits for the given client583 584 585 private:586 CGridGathererConnector* clientFromServerConnector_ = nullptr ;587 public:588 CGridGathererConnector* getClientFromServerConnector(void) { if (clientFromServerConnector_==nullptr) computeClientFromServerConnector() ; return clientFromServerConnector_;}589 void computeClientFromServerConnector(void) ;590 598 591 599 }; // class CGrid -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/scalar.cpp
r1930 r1934 296 296 CContext* context = CContext::getCurrent(); 297 297 CDistributedElement scatteredElement(1,globalIndex) ; 298 clientToServerConnector_[client] = new CScattererConnector(localElement_->getView(CElementView::FULL), scatteredElement.getView(CElementView::FULL), context->getIntraComm()) ; 298 clientToServerConnector_[client] = new CScattererConnector(localElement_->getView(CElementView::FULL), scatteredElement.getView(CElementView::FULL), 299 context->getIntraComm()) ; 299 300 clientToServerConnector_[client] ->computeConnector() ; 301 302 // need to be completed 303 300 304 } 301 305 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/scalar.hpp
r1930 r1934 182 182 CGathererConnector* getServerFromClientConnector(void) { return serverFromClientConnector_ ;} 183 183 184 185 186 184 private: 185 CScattererConnector* serverToClientConnector_ = nullptr ; 186 public: 187 CScattererConnector* getServerToClientConnector(void) { return serverToClientConnector_ ;} 188 189 private: 190 map<CContextClient*,CGathererConnector*> clientFromServerConnector_ ; 191 public: 192 CGathererConnector* getClientFromServerConnector(CContextClient* client) { return clientFromServerConnector_[client] ;} 187 193 188 194 private: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/test/generic_testcase.f90
r1929 r1934 186 186 INTEGER :: i,j,k,xy,x,y,z,w 187 187 DOUBLE PRECISION :: scale,dist 188 LOGICAL :: ierr,ok 188 LOGICAL :: ok 189 INTEGER :: ierr 189 190 190 191 LOGICAL :: ok_field2D, ok_field3D, ok_pressure, ok_field2D_sub, ok_field3D_sub,ok_field3D_recv, ok_field3D_send
Note: See TracChangeset
for help on using the changeset viewer.