Changeset 1883
- Timestamp:
- 05/19/20 16:44:47 (4 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src
- Files:
-
- 4 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r1878 r1883 1166 1166 else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 1167 1167 1168 // client side, assign context for file reading 1168 1169 if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ; 1170 1171 // server side, assign context where to send file data read 1172 if (serviceType_==CServicesManager::CServicesManager::GATHERER || serviceType_==CServicesManager::IO_SERVER) 1173 for(auto file : this->enabledReadModeFiles) file->setContextClient(client) ; 1169 1174 1170 1175 // workflow endpoint => sent to IO/SERVER … … 1192 1197 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 1193 1198 { 1194 // no filter to send data from server to client => to be implemented (reading case) 1199 for(auto field : fileInField) 1200 { 1201 field->connectToServerToClient(garbageCollector) ; 1202 } 1195 1203 } 1196 1204 … … 1247 1255 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 1248 1256 { 1249 // no filter for reading data from file => to be implemented 1257 for(auto field : fileInField) 1258 { 1259 field->connectToFileReader(garbageCollector) ; 1260 } 1250 1261 } 1251 1262 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
r1882 r1883 24 24 #include "spatial_transform_filter.hpp" 25 25 #include "file_server_writer_filter.hpp" 26 #include "file_server_reader_filter.hpp" 27 #include "server_to_client_filter.hpp" 26 28 #include "tracer.hpp" 27 29 … … 435 437 TRY 436 438 { 437 CContextClient* client = server->getAssociatedClient() ; 439 440 CArray<double,1> data ; 441 EReadField hasData = readField(data); 442 CDate date = CContext::getCurrent()->getCalendar()->getCurrentDate(); 443 if (hasData == RF_DATA) fileServerReaderFilter_->streamData(date,data) ; 444 else fileServerReaderFilter_->signalEndOfStream(date) ; 445 446 } 447 CATCH_DUMP_ATTR 448 449 450 void CField::sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) 451 TRY 452 { 438 453 CEventClient event(getType(), EVENT_ID_READ_DATA_READY); 439 454 std::list<CMessage> msgs; 440 441 CArray<double,1> data ;442 EReadField hasData = readField(data);443 444 map<int, CArray<double,1> >::iterator it;445 455 if (!grid_->doGridHaveDataDistributed(client)) 446 456 { … … 455 465 CMessage& msg = msgs.back(); 456 466 msg << getId(); 457 switch (hasData) 458 { 459 case RF_DATA: 460 msg << getNStep() - 1 << data; 461 break; 462 case RF_NODATA: 463 msg << int(-2) << data; 464 break; 465 case RF_EOF: 466 default: 467 msg << int(-1); 468 break; 469 } 470 467 468 if (isEOF) msg << int(-1); 469 else msg << getNStep() - 1 << data; 470 471 471 event.push(*itRank, 1, msg); 472 472 } … … 484 484 for (auto it = outLocalIndexStoreOnClient.begin(); it != outLocalIndexStoreOnClient.end(); ++it) 485 485 { 486 CArray<size_t,1>& indexTmp = it->second; 487 CArray<double,1> tmp(indexTmp.numElements()); 488 for (int idx = 0; idx < indexTmp.numElements(); ++idx) 489 { 490 tmp(idx) = data(indexTmp(idx)); 491 } 486 492 487 493 488 msgs.push_back(CMessage()); 494 489 CMessage& msg = msgs.back(); 495 490 msg << getId(); 496 switch (hasData) 491 if (isEOF) msg << int(-1); 492 else 497 493 { 498 case RF_DATA: 499 msg << getNStep() - 1 << tmp; 500 break; 501 case RF_NODATA: 502 msg << int(-2) << tmp; 503 break; 504 case RF_EOF: 505 default: 506 msg << int(-1); 507 break; 494 CArray<size_t,1>& indexTmp = it->second; 495 CArray<double,1> tmp(indexTmp.numElements()); 496 for (int idx = 0; idx < indexTmp.numElements(); ++idx) tmp(idx) = data(indexTmp(idx)); 497 msg << getNStep() - 1 << tmp; 508 498 } 509 510 499 event.push(it->first, grid_->getNbReadSenders(client)[it->first], msg); 511 500 } … … 1481 1470 } 1482 1471 1472 /*! 1473 * Connect field to a file reader filter to read data from file (on server side). 1474 */ 1475 void CField::connectToFileReader(CGarbageCollector& gc) 1476 { 1477 fileServerReaderFilter_ = std::shared_ptr<CFileServerReaderFilter>(new CFileServerReaderFilter(gc, this)); 1478 fileServerReaderFilter_->connectOutput(inputFilter, 0); 1479 } 1483 1480 1484 1481 /*! … … 1492 1489 storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, detectMissingValues, defaultValue)); 1493 1490 instantDataFilter->connectOutput(storeFilter, 0); 1491 } 1492 1493 1494 1495 void CField::connectToServerToClient(CGarbageCollector& gc) 1496 { 1497 serverToClientFilter_ = std::shared_ptr<CServerToClientFilter>(new CServerToClientFilter(gc, this, client)); 1498 instantDataFilter->connectOutput(serverToClientFilter_, 0); 1494 1499 } 1495 1500 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.hpp
r1882 r1883 43 43 class CFileWriterFilter; 44 44 class CFileServerWriterFilter; 45 45 class CFileServerReaderFilter; 46 class CServerToClientFilter; 46 47 ///-------------------------------------------------------------- 47 48 … … 183 184 static bool dispatchEvent(CEventServer& event); 184 185 void sendAllAttributesToServer(CContextClient* client) ; 185 void sendUpdateData(const CArray<double,1>& data);186 186 void sendUpdateData(Time timestamp, const CArray<double,1>& data, CContextClient* client); 187 void sendUpdateDataServerToClient(bool isEOF, const CArray<double,1>& data, CContextClient* client) ; 188 187 189 static void recvUpdateData(CEventServer& event); 188 190 void recvUpdateData(std::map<int,CBufferIn*>& rankBuffers); … … 250 252 void connectToServerInput(CGarbageCollector& gc) ; 251 253 void connectToModelOutput(CGarbageCollector& gc); 254 void connectToFileReader(CGarbageCollector& gc) ; 255 void connectToServerToClient(CGarbageCollector& gc) ; 252 256 253 257 void computeGridIndexToFileServer(void) ; … … 402 406 //! The terminal filter which writes data to file 403 407 std::shared_ptr<CFileServerWriterFilter> fileServerWriterFilter; 408 409 //! The source filter which read data from file 410 std::shared_ptr<CFileServerReaderFilter> fileServerReaderFilter_; 411 412 //! The terminal filter which send data from file server to client 413 std::shared_ptr<CServerToClientFilter> serverToClientFilter_; 414 415 404 416 }; // class CField 405 417
Note: See TracChangeset
for help on using the changeset viewer.