Changeset 1871
- Timestamp:
- 04/21/20 16:46:20 (4 years ago)
- Location:
- XIOS/dev/dev_ym/XIOS_COUPLING/src
- Files:
-
- 10 edited
Legend:
- Unmodified
- Added
- Removed
-
XIOS/dev/dev_ym/XIOS_COUPLING/src/io/nc4_data_output.cpp
r1869 r1871 1640 1640 { 1641 1641 if (grid->doGridHaveDataDistributed()) 1642 grid->get DistributionServer()->computeGlobalIndex(indexes);1642 grid->getServerDistribution()->computeGlobalIndex(indexes); 1643 1643 1644 1644 std::vector<StdSize> start, count; -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/axis.cpp
r1870 r1871 1075 1075 // Some value should be reset here 1076 1076 data_begin.setValue(0); 1077 data_n.setValue(data_index.numElements()); 1077 1078 globalLocalIndexMap_.rehash(std::ceil(index.numElements()/globalLocalIndexMap_.max_load_factor())); 1078 1079 // for (int idx = 0; idx < index.numElements(); ++idx) globalLocalIndexMap_[idx] = index(idx); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/context.cpp
r1870 r1871 1017 1017 // postProcessing(); 1018 1018 1019 1019 1020 // Make sure the calendar was correctly created 1021 if (serviceType_!=CServicesManager::CLIENT) CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar(); 1020 1022 if (!calendar) 1021 1023 ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"") … … 1058 1060 vector<CField*>& fieldWithReadAccess = fieldsWithReadAccess_ ; 1059 1061 vector<CField*> fieldModelIn ; // fields potentially from model 1060 1062 1063 // define if files are on clientSied or serverSide 1064 if (serviceType_==CServicesManager::CLIENT) 1065 { 1066 for (auto& file : enabledWriteModeFiles) file->setClientSide() ; 1067 for (auto& file : enabledReadModeFiles) file->setClientSide() ; 1068 } 1069 else 1070 { 1071 for (auto& file : enabledWriteModeFiles) file->setServerSide() ; 1072 for (auto& file : enabledReadModeFiles) file->setServerSide() ; 1073 } 1074 1075 1076 1061 1077 // find all field potentially at workflow end 1062 1078 vector<CField*> endWorkflowFields ; … … 1088 1104 } 1089 1105 1106 // workflow endpoint => write to file 1107 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) 1108 { 1109 for(auto field : fileOutField) 1110 { 1111 field->connectToFileWriter(garbageCollector) ; // connect the field to server filter 1112 } 1113 } 1114 1115 // workflow endpoint => Send data from server to client 1116 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 1117 { 1118 // no filter to send data from server to client => to be implemented (reading case) 1119 } 1120 1121 // workflow endpoint => sent to model on client side 1122 if (serviceType_==CServicesManager::CLIENT) 1123 { 1124 for(auto field : fieldWithReadAccess) field->connectToModelOutput(garbageCollector) ; 1125 } 1126 1090 1127 1091 1128 // workflow startpoint => data from model … … 1098 1135 } 1099 1136 } 1100 1101 1137 1138 // workflow startpoint => data from client on server side 1139 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER || serviceType_==CServicesManager::OUT_SERVER) 1140 { 1141 for(auto field : fieldModelIn) 1142 { 1143 field->connectToClientInput(garbageCollector) ; // connect the field to server filter 1144 } 1145 } 1146 1147 // workflow startpoint => data from server on client side 1148 if (serviceType_==CServicesManager::CLIENT) 1149 { 1150 for(auto field : fileInField) 1151 { 1152 field->connectToServerInput(garbageCollector) ; // connect the field to server filter 1153 field->computeGridIndexToFileServer() ; // compute grid index for transfer to the server context 1154 field->sendFieldToFileServer() ; 1155 } 1156 } 1157 1158 // workflow startpoint => data read from file on server side 1159 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::GATHERER) 1160 { 1161 // no filter for reading data from file => to be implemented 1162 } 1163 1164 1165 if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) this->sendCloseDefinition(); 1166 if (serviceType_==CServicesManager::IO_SERVER || serviceType_==CServicesManager::OUT_SERVER) createFileHeader(); 1167 if (serviceType_==CServicesManager::CLIENT) startPrefetchingOfEnabledReadModeFiles(); 1168 1169 1102 1170 1103 1171 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/domain.cpp
r1870 r1871 1850 1850 TRY 1851 1851 { 1852 if (computeConnectedClients_done_.count(client) ==0) return ;1852 if (computeConnectedClients_done_.count(client)!=0) return ; 1853 1853 else computeConnectedClients_done_.insert(client) ; 1854 1854 -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.cpp
r1870 r1871 1192 1192 { 1193 1193 grid->makeTransformGrid() ; // make the grid transformation 1194 std::pair<std::shared_ptr<CFilter>, std::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridSrc, grid, detectMissingValues, defaultValue); 1195 lastFilter->connectOutput(filters.first, 0); 1196 lastFilter = filters.second; 1197 gridSrc=grid ; 1194 if (grid->hasTransform()) 1195 { 1196 std::pair<std::shared_ptr<CFilter>, std::shared_ptr<CFilter> > filters = CSpatialTransformFilter::buildFilterGraph(gc, gridSrc, grid, detectMissingValues, defaultValue); 1197 lastFilter->connectOutput(filters.first, 0); 1198 lastFilter = filters.second; 1199 gridSrc=grid ; 1200 } 1198 1201 } 1199 1202 instantDataFilter = lastFilter ; … … 1205 1208 { 1206 1209 if (!grid_->checkIfCompleted()) return false ; 1207 grid_->solveElementsRefInheritance() ; 1208 grid_->completeGrid(); // grid generation, to be checked 1209 grid_->checkElementsAttributes() ; 1210 instantDataFilter=inputFilter ; 1211 setModelIn() ; // no reference, the field is potentially a source field from model 1210 1211 if (hasFileIn()) // input file, attemp to read the grid from file 1212 { 1213 // must be checked 1214 fileIn_->initRead() ; 1215 fileIn_->checkReadFile(); 1216 grid_->solveElementsRefInheritance() ; 1217 if (fileIn_->isClientSide()) fileIn_->readFieldAttributesMetaData(this); 1218 grid_->completeGrid(); // grid generation, to be checked 1219 if (fileIn_->isClientSide()) fileIn_->readFieldAttributesValues(this); 1220 grid_->checkElementsAttributes() ; 1221 grid_->solveDomainAxisBaseRef(); 1222 // probably in future tag grid incomplete if coming from a reading 1223 instantDataFilter=inputFilter ; 1224 } 1225 else 1226 { 1227 setModelIn() ; // no reference, the field is potentially a source field from model 1228 1229 grid_->solveElementsRefInheritance() ; 1230 grid_->completeGrid(); // grid generation, to be checked 1231 grid_->checkElementsAttributes() ; 1232 instantDataFilter=inputFilter ; 1233 } 1212 1234 } 1213 1235 … … 1225 1247 { 1226 1248 // insert temporal filter before sending to files 1227 fileWriterFilter = std::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this, file->getContextClient()));1249 fileWriterFilter = std::shared_ptr<CFileWriterFilter>(new CFileWriterFilter(gc, this, client)); 1228 1250 // insert temporal filter before sending to files 1229 getTemporalDataFilter(gc, file ->output_freq)->connectOutput(fileWriterFilter, 0);1251 getTemporalDataFilter(gc, fileOut_->output_freq)->connectOutput(fileWriterFilter, 0); 1230 1252 } 1231 1253 … … 1251 1273 } 1252 1274 1275 /*! 1276 * Connect field to a source filter to receive data from a client (on server side). 1277 */ 1278 void CField::connectToClientInput(CGarbageCollector& gc) 1279 { 1280 clientSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false)); 1281 clientSourceFilter -> connectOutput(inputFilter,0) ; 1282 } 1283 1284 1285 /*! 1286 * Connect field to a source filter to receive data from a server (on client side). 1287 */ 1288 void CField::connectToServerInput(CGarbageCollector& gc) 1289 { 1290 serverSourceFilter = std::shared_ptr<CSourceFilter>(new CSourceFilter(gc, grid_, false, false)); 1291 serverSourceFilter -> connectOutput(inputFilter,0) ; 1292 } 1293 1294 /*! 1295 * Connect field to a file writer filter to write data in file (on server side). 1296 */ 1297 void CField::connectToFileWriter(CGarbageCollector& gc) 1298 { 1299 fileServerWriterFilter = std::shared_ptr<CFileServerWriterFilter>(new CFileServerWriterFilter(gc, this)); 1300 instantDataFilter->connectOutput(fileServerWriterFilter, 0); 1301 } 1302 1303 1304 /*! 1305 * Connect field to a store filter to output data to model on client Side 1306 */ 1307 void CField::connectToModelOutput(CGarbageCollector& gc) 1308 { 1309 const bool detectMissingValues = (!detect_missing_value.isEmpty() && !default_value.isEmpty() && detect_missing_value == true); 1310 const double defaultValue = detectMissingValues ? default_value : (!default_value.isEmpty() ? default_value : 0.0); 1311 1312 storeFilter = std::shared_ptr<CStoreFilter>(new CStoreFilter(gc, CContext::getCurrent(), grid_, detectMissingValues, defaultValue)); 1313 instantDataFilter->connectOutput(storeFilter, 0); 1314 } 1315 1253 1316 /*! 1254 1317 * Transform the grid_path attribut into vector of grid. -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/field.hpp
r1870 r1871 227 227 void connectToFileServer(CGarbageCollector& gc) ; 228 228 void connectToModelInput(CGarbageCollector& gc) ; 229 void connectToFileWriter(CGarbageCollector& gc) ; 230 void connectToClientInput(CGarbageCollector& gc) ; 231 void connectToServerInput(CGarbageCollector& gc) ; 232 void connectToModelOutput(CGarbageCollector& gc); 233 229 234 void computeGridIndexToFileServer(void) ; 230 235 … … 246 251 247 252 CFile* fileIn_ = nullptr ; //<! pointer to input related file 248 bool hasFileIn(void) { fileIn_==nullptr ? false : true ;}253 bool hasFileIn(void) { return fileIn_==nullptr ? false : true ;} 249 254 CFile* getFileIn(void) {return fileIn_;} 250 255 void setFileIn(CFile* fileIn) { fileIn_ = fileIn ;} … … 252 257 253 258 CFile* fileOut_ = nullptr ; //<! pointer to output related file 254 bool hasFileOut(void) { fileOut_==nullptr ? false : true ;}259 bool hasFileOut(void) { return fileOut_==nullptr ? false : true ;} 255 260 CFile* getFileOut(void) {return fileOut_;} 256 261 void setFileOut(CFile* fileOut) { fileOut_ = fileOut ;} … … 258 263 259 264 CCouplerIn* couplerIn_ = nullptr ; //<!pointer to input related coupler 260 bool hasCouplerIn(void) { couplerIn_==nullptr ? false : true ;}265 bool hasCouplerIn(void) { return couplerIn_==nullptr ? false : true ;} 261 266 CCouplerIn* getCouplerIn(void) {return couplerIn_;} 262 267 void setCouplerIn(CCouplerIn* couplerIn) { couplerIn_ = couplerIn ;} … … 264 269 265 270 CCouplerOut* couplerOut_ = nullptr ; //<!pointer to output related coupler 266 bool hasCouplerOut(void) { couplerOut_==nullptr ? false : true ;}271 bool hasCouplerOut(void) { return couplerOut_==nullptr ? false : true ;} 267 272 CCouplerOut* getCouplerOut(void) {return couplerOut_;} 268 273 void setCouplerOut(CCouplerOut* couplerOut) { couplerOut_ = couplerOut ;} -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/file.cpp
r1870 r1871 345 345 // TODO: This condition should be changed soon. It only works with maximum number of level as 2 346 346 347 //ym if (CServer::serverLevel == 0 || CServer::serverLevel == 1) 348 if (context->getServiceType()==CServicesManager::IO_SERVER || context->getServiceType()==CServicesManager::GATHERER) 349 { 347 //ym if (CServer::serverLevel == 0 || CServer::serverLevel == 1) 348 // ym client must doing it also 349 // if (context->getServiceType()==CServicesManager::IO_SERVER || context->getServiceType()==CServicesManager::GATHERER) 350 // { 350 351 if (!mode.isEmpty() && mode.getValue() == mode_attr::read) 351 352 { … … 357 358 } 358 359 //checkSplit(); // Really need for reading? 359 }360 // } 360 361 } 361 362 CATCH_DUMP_ATTR … … 756 757 CATCH_DUMP_ATTR 757 758 759 void CFile::readFieldAttributesMetaData(CField* field) 760 { 761 this->data_in->readFieldAttributesMetaData(field); 762 } 763 764 void CFile::readFieldAttributesValues(CField* field) 765 { 766 this->data_in->readFieldAttributesValues(field); 767 } 758 768 /*! 759 769 \brief Parse xml file and write information into file object -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/file.hpp
r1870 r1871 104 104 void close(void); 105 105 void readAttributesOfEnabledFieldsInReadMode(); 106 106 void readFieldAttributesMetaData(CField* field) ; 107 void readFieldAttributesValues(CField* field) ; 107 108 // Some processing on file 108 109 void solveFieldRefInheritance(bool apply); … … 197 198 std::vector<CField*> enabledFields; 198 199 200 private: 201 bool isClientSide_ ; // the file is on client side or on server side ? 202 public: 203 bool isClientSide(void) { return isClientSide_ ;} 204 bool isServerSide(void) { return !isClientSide_ ;} 205 void setClientSide(void) { isClientSide_=true ;} 206 void setServerSide(void) { isClientSide_=false ;} 207 208 private: 209 199 210 200 211 public: -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/grid.cpp
r1870 r1871 31 31 , vScalarGroup_(), scalarList_(), isScalarListSet(false) 32 32 , clientDistribution_(0), isIndexSent(false) , serverDistribution_(0), clientServerMap_(0) 33 , writtenDataSize_(0),numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)33 , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 34 34 , connectedDataSize_(), connectedServerRank_(), connectedServerRankRead_(), connectedDataSizeRead_() 35 35 , isCompressible_(false) … … 52 52 , vScalarGroup_(), scalarList_(), isScalarListSet(false) 53 53 , clientDistribution_(0), isIndexSent(false) , serverDistribution_(0), clientServerMap_(0) 54 , writtenDataSize_(0),numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0)54 , numberWrittenIndexes_(0), totalNumberWrittenIndexes_(0), offsetWrittenIndexes_(0) 55 55 , connectedDataSize_(), connectedServerRank_(), connectedServerRankRead_(), connectedDataSizeRead_() 56 56 , isCompressible_(false) … … 1665 1665 TRY 1666 1666 { 1667 CContextClient* client = server->getAssociatedClient(); 1668 connectedServerRankRead_ = ranks; 1669 for (int n = 0; n < ranks.size(); n++) 1670 { 1671 int rank = ranks[n]; 1672 CBufferIn& buffer = *buffers[n]; 1673 buffer >> isCompressible_; // probably to be removed later 1674 CArray<size_t,1> outIndex; 1675 buffer >> outIndex; 1676 outGlobalIndexFromClient_.insert(std::make_pair(rank, outIndex)); 1677 connectedDataSizeRead_[rank] = outIndex.numElements(); 1678 } 1679 1680 nbReadSenders_[client] = CClientServerMappingDistributed::computeConnectedClients(client->serverSize, client->clientSize, 1681 client->intraComm, ranks); 1682 } 1683 CATCH_DUMP_ATTR 1684 1685 1686 void CGrid::computeServerDistribution(void) 1687 TRY 1688 { 1689 if (computeServerDistribution_done_) return ; 1690 else computeServerDistribution_done_=true ; 1691 1692 CContext* context = CContext::getCurrent(); 1693 1694 int idx = 0, numElement = axis_domain_order.numElements(); 1695 int ssize = numElement; 1696 std::vector<int> indexMap(numElement); 1697 for (int i = 0; i < numElement; ++i) 1698 { 1699 indexMap[i] = idx; 1700 if (2 == axis_domain_order(i)) 1701 { 1702 ++ssize; 1703 idx += 2; 1704 } 1705 else 1706 ++idx; 1707 } 1708 1709 for (int n = 0; n < connectedServerRankRead_.size(); n++) 1710 { 1711 int rank = connectedServerRankRead_[n]; 1712 size_t dataSize = 0; 1713 1714 if (0 == serverDistribution_) 1715 { 1716 int axisId = 0, domainId = 0, scalarId = 0, globalSize = 1; 1717 std::vector<CDomain*> domainList = getDomains(); 1718 std::vector<CAxis*> axisList = getAxis(); 1719 std::vector<int> nBegin(ssize), nSize(ssize), nGlob(ssize), nBeginGlobal(ssize), nGlobElement(numElement); 1720 std::vector<CArray<int,1> > globalIndex(numElement); 1721 for (int i = 0; i < numElement; ++i) 1722 { 1723 nGlobElement[i] = globalSize; 1724 if (2 == axis_domain_order(i)) //domain 1725 { 1726 nBegin[indexMap[i]] = domainList[domainId]->ibegin; 1727 nSize[indexMap[i]] = domainList[domainId]->ni; 1728 nBeginGlobal[indexMap[i]] = 0; 1729 nGlob[indexMap[i]] = domainList[domainId]->ni_glo; 1730 1731 nBegin[indexMap[i] + 1] = domainList[domainId]->jbegin; 1732 nSize[indexMap[i] + 1] = domainList[domainId]->nj; 1733 nBeginGlobal[indexMap[i] + 1] = 0; 1734 nGlob[indexMap[i] + 1] = domainList[domainId]->nj_glo; 1735 1736 { 1737 int count = 0; 1738 globalIndex[i].resize(nSize[indexMap[i]]*nSize[indexMap[i]+1]); 1739 for (int jdx = 0; jdx < nSize[indexMap[i]+1]; ++jdx) 1740 for (int idx = 0; idx < nSize[indexMap[i]]; ++idx) 1741 { 1742 globalIndex[i](count) = (nBegin[indexMap[i]] + idx) + (nBegin[indexMap[i]+1] + jdx) * nGlob[indexMap[i]]; 1743 ++count; 1744 } 1745 } 1746 1747 ++domainId; 1748 } 1749 else if (1 == axis_domain_order(i)) // axis 1750 { 1751 nBegin[indexMap[i]] = axisList[axisId]->begin; 1752 nSize[indexMap[i]] = axisList[axisId]->n; 1753 nBeginGlobal[indexMap[i]] = 0; 1754 nGlob[indexMap[i]] = axisList[axisId]->n_glo; 1755 globalIndex[i].resize(nSize[indexMap[i]]); 1756 for (int idx = 0; idx < nSize[indexMap[i]]; ++idx) 1757 globalIndex[i](idx) = nBegin[indexMap[i]] + idx; 1758 1759 ++axisId; 1760 } 1761 else // scalar 1762 { 1763 nBegin[indexMap[i]] = 0; 1764 nSize[indexMap[i]] = 1; 1765 nBeginGlobal[indexMap[i]] = 0; 1766 nGlob[indexMap[i]] = 1; 1767 globalIndex[i].resize(1); 1768 globalIndex[i](0) = 0; 1769 ++scalarId; 1770 } 1771 } 1772 dataSize = 1; 1773 1774 for (int i = 0; i < nSize.size(); ++i) 1775 dataSize *= nSize[i]; 1776 serverDistribution_ = new CDistributionServer(context->intraCommRank_, 1777 globalIndex, axis_domain_order, 1778 nBegin, nSize, nBeginGlobal, nGlob); 1779 } 1780 } 1781 } 1782 CATCH_DUMP_ATTR 1783 1784 1785 1786 1787 1788 1789 /* old interface => transform into compute receivedIndex 1790 void CGrid::recvIndex(vector<int> ranks, vector<CBufferIn*> buffers, CContextServer* server) 1791 TRY 1792 { 1667 1793 CContext* context = CContext::getCurrent(); 1668 1794 connectedServerRankRead_ = ranks; … … 1829 1955 } 1830 1956 CATCH_DUMP_ATTR 1957 */ 1958 1831 1959 1832 1960 /* … … 1922 2050 TRY 1923 2051 { 1924 return (0 != writtenDataSize_);2052 return (0 != getWrittenDataSize()); 1925 2053 } 1926 2054 CATCH_DUMP_ATTR … … 1932 2060 \return size of data written on server 1933 2061 */ 1934 size_t CGrid::getWrittenDataSize() const1935 TRY 1936 { 1937 return writtenDataSize_;2062 size_t CGrid::getWrittenDataSize() 2063 TRY 2064 { 2065 return getServerDistribution()->getGridSize(); 1938 2066 } 1939 2067 CATCH … … 1972 2100 CATCH 1973 2101 1974 CDistributionServer* CGrid::getDistributionServer() 1975 TRY 1976 { 1977 return serverDistribution_; 1978 } 1979 CATCH_DUMP_ATTR 1980 2102 1981 2103 CDistributionClient* CGrid::getClientDistribution() 1982 2104 TRY … … 2113 2235 gridPtr->sendCreateChild(this->getId(),client); 2114 2236 this->sendAllAttributesToServer(client); 2237 if (isScalarGrid()) sendIndexScalarGrid(); 2238 else sendIndex(); 2115 2239 this->sendAllDomains(client); 2116 2240 this->sendAllAxis(client); -
XIOS/dev/dev_ym/XIOS_COUPLING/src/node/grid.hpp
r1870 r1871 194 194 bool doGridHaveDataToWrite(); 195 195 bool doGridHaveDataDistributed(CContextClient* client = 0); 196 size_t getWrittenDataSize() const;196 size_t getWrittenDataSize() ; 197 197 int getNumberWrittenIndexes() const; 198 198 int getTotalNumberWrittenIndexes() const; 199 199 int getOffsetWrittenIndexes() const; 200 200 201 CDistributionServer* getDistributionServer();202 201 CGridTransformation* getTransformations(); 203 202 … … 248 247 CDistributionClient* getClientDistribution(void); 249 248 249 private: 250 /** Server-like distribution calculated upon receiving indexes */ 251 CDistributionServer* serverDistribution_; 252 void computeServerDistribution(void) ; 253 bool computeServerDistribution_done_=false ; 254 public: 255 CDistributionServer* getServerDistribution(void) { if (computeServerDistribution_done_) computeServerDistribution() ; return serverDistribution_ ;} 256 257 250 258 private: 251 259 template<int N> … … 413 421 bool isAxisListSet, isDomListSet, isScalarListSet; 414 422 415 /** Server-like distribution calculated upon receiving indexes */416 CDistributionServer* serverDistribution_;417 418 423 CClientServerMapping* clientServerMap_; 419 size_t writtenDataSize_;420 424 int numberWrittenIndexes_, totalNumberWrittenIndexes_, offsetWrittenIndexes_; 421 425
Note: See TracChangeset
for help on using the changeset viewer.