31 , calendar(), hasClient(false), hasServer(false)
32 , isPostProcessed(false), finalized(false)
33 , idServer_(), client(0), server(0)
34 , allProcessed(false), countChildCtx_(0)
40 , calendar(), hasClient(false), hasServer(false)
41 , isPostProcessed(false), finalized(false)
42 , idServer_(), client(0), server(0)
43 , allProcessed(false), countChildCtx_(0)
97 this->calendar = newCalendar;
114 if (attributes.end() != attributes.find(
"src"))
116 StdIFStream ifs ( attributes[
"src"].c_str() , StdIFStream::in );
117 if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 )
118 ERROR(
"void CContext::parse(xml::CXMLNode & node)",
119 <<endl<<
"Can not open <"<<attributes[
"src"].c_str()<<
"> file" );
121 ERROR(
"CContext::parse(xml::CXMLNode & node)",
122 <<
"[ filename = " << attributes[
"src"] <<
" ] Bad xml stream !");
127 DEBUG(
"Le noeud is wrong defined but will be considered as a context !");
129 if (!(node.goToChildElement()))
131 DEBUG(
"Le context ne contient pas d'enfant !");
139 attributes = node.getAttributes();
141 if (attributes.end() != attributes.find(
"id"))
143 DEBUG(<<
"Definition node has an id,"
144 <<
"it will not be taking account !");
147 #define DECLARE_NODE(Name_, name_) \
148 if (name.compare(C##Name_##Definition::GetDefName()) == 0) \
149 { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; }
150 #define DECLARE_NODE_PAR(Name_, name_)
153 DEBUG(<<
"The element \'" << name
155 <<
"\' is not a definition !");
157 }
while (node.goToNextElement());
159 node.goToParentElement();
170 std::vector<CContext*> def_vector =
172 std::vector<CContext*>::iterator
173 it = def_vector.begin(), end = def_vector.end();
175 out <<
"<? xml version=\"1.0\" ?>" << std::endl;
178 for (; it != end; it++)
182 out << *context << std::endl;
198 <<
" id=\"" << this->
getId() <<
"\" "
199 << SuperClassAttribute::toString() <<
">" << std::endl;
207 #define DECLARE_NODE(Name_, name_) \
208 if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
209 oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl;
210 #define DECLARE_NODE_PAR(Name_, name_)
230 #define DECLARE_NODE(Name_, name_) \
231 if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \
232 C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply);
233 #define DECLARE_NODE_PAR(Name_, name_)
246 C##Name_##Definition::has(C##Name_##Definition::GetDefName()) ||
248 #include
"node_type.conf"
258 #define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes();
259 #define DECLARE_NODE_PAR(Name_, name_)
272 MPI_Comm intraCommServer, interCommServer;
278 client =
new CContextClient(
this, intraComm, interComm, cxtServer);
281 intraCommServer = intraComm;
282 interCommServer = interComm;
286 MPI_Comm_dup(intraComm, &intraCommServer);
287 comms.push_back(intraCommServer);
288 MPI_Comm_dup(interComm, &interCommServer);
289 comms.push_back(interCommServer);
293 string contextRegistryId=getId() ;
294 size_t pos=contextRegistryId.find(
"_server_") ;
295 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ;
298 registryIn->setPath(contextRegistryId) ;
299 if (client->clientRank==0) registryIn->fromFile(
"xios_registry.bin") ;
300 registryIn->bcastRegistry() ;
303 registryOut->setPath(contextRegistryId) ;
305 server =
new CContextServer(
this, intraCommServer, interCommServer);
310 clientPrimServer.push_back(
new CContextClient(
this, intraComm, interComm));
311 MPI_Comm_dup(intraComm, &intraCommServer);
312 comms.push_back(intraCommServer);
313 MPI_Comm_dup(interComm, &interCommServer);
314 comms.push_back(interCommServer);
315 serverPrimServer.push_back(
new CContextServer(
this, intraCommServer, interCommServer));
335 #define DECLARE_NODE(Name_, name_) \
336 if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition);
337 #define DECLARE_NODE_PAR(Name_, name_)
340 #undef DECLARE_NODE_PAR
343 std::map<int, StdSize> maxEventSize;
344 std::map<int, StdSize> bufferSize = getAttributesBufferSize(maxEventSize, contextClient, bufferForWriting);
345 std::map<int, StdSize> dataBufferSize = getDataBufferSize(maxEventSize, contextClient, bufferForWriting);
347 std::map<int, StdSize>::iterator it, ite = dataBufferSize.end();
348 for (it = dataBufferSize.begin(); it != ite; ++it)
349 if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second;
352 ite = bufferSize.end();
353 for (it = bufferSize.begin(); it != ite; ++it)
356 if (it->second < minBufferSize) it->second = minBufferSize;
361 if (contextClient->isServerLeader())
363 const std::list<int>& ranks = contextClient->getRanksServerLeader();
364 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
366 if (!bufferSize.count(*itRank))
368 bufferSize[*itRank] = minBufferSize;
369 maxEventSize[*itRank] = minEventSize;
373 contextClient->setBufferSize(bufferSize, maxEventSize);
393 string contextRegistryId=getId() ;
394 size_t pos=contextRegistryId.find(
"_server_") ;
395 if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ;
398 registryIn->setPath(contextRegistryId) ;
399 if (server->intraCommRank==0) registryIn->fromFile(
"xios_registry.bin") ;
400 registryIn->bcastRegistry() ;
402 registryOut->setPath(contextRegistryId) ;
404 MPI_Comm intraCommClient, interCommClient;
407 intraCommClient = intraComm;
408 interCommClient = interComm;
412 MPI_Comm_dup(intraComm, &intraCommClient);
413 comms.push_back(intraCommClient);
414 MPI_Comm_dup(interComm, &interCommClient);
415 comms.push_back(interCommClient);
417 client =
new CContextClient(
this,intraCommClient,interCommClient, cxtClient);
425 bool clientReady, serverFinished;
430 client->checkBuffers();
431 bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent();
432 if (hasTmpBufferedEvent)
433 hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent();
435 return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing);
440 client->checkBuffers();
441 bool serverFinished =
true;
443 serverFinished = server->eventLoop(enableEventsProcessing);
444 bool serverPrimFinished =
true;
445 for (
int i = 0; i < clientPrimServer.size(); ++i)
448 clientPrimServer[i]->checkBuffers();
450 serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing);
452 return ( serverFinished && serverPrimFinished);
457 client->checkBuffers();
458 return server->eventLoop(enableEventsProcessing);
504 for (std::list<MPI_Comm>::iterator it =
comms.begin(); it !=
comms.end(); ++it)
505 MPI_Comm_free(&(*it));
508 info(20)<<
"CContext: Context <"<<
getId()<<
"> is finalized."<<endl;
523 info(100)<<
"DEBUG: context "<<
getId()<<
" Send client finalize<<"<<endl ;
530 }
while (!bufferReleased);
546 for (std::list<MPI_Comm>::iterator it =
comms.begin(); it !=
comms.end(); ++it)
547 MPI_Comm_free(&(*it));
550 info(20)<<
"CContext: Context <"<<
getId()<<
"> is finalized."<<endl;
562 for (std::list<MPI_Comm>::iterator it =
comms.begin(); it !=
comms.end(); ++it)
563 MPI_Comm_free(&(*it));
651 for (
int i = 0; i < nbSrvPools; ++i)
664 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
665 event.
push(*itRank,1,msg);
679 get(id)->recvPostProcessingGlobalAttributes(*buffer);
686 postProcessingGlobalAttributes();
737 for (
unsigned int i = 0; i < activeFiles.size(); i++)
738 (
void)activeFiles[i]->getEnabledFields();
753 int size = activeFiles.size();
754 for (
int i = 0; i < size; ++i)
756 activeFiles[i]->sendGridComponentOfEnabledFields();
768 int size = activeFiles.size();
769 for (
int i = 0; i < size; ++i)
771 activeFiles[i]->sendGridOfEnabledFields();
780 for (
int i = 0; i < size; ++i)
794 int size = activeFiles.size();
795 for (
int i = 0; i < size; ++i)
797 activeFiles[i]->checkGridOfEnabledFields();
810 int size = this->enabledFiles.size();
811 for (
int i = 0; i < size; ++i)
813 this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer);
816 for (
int i = 0; i < size; ++i)
818 this->enabledFiles[i]->generateNewTransformationGridDest();
832 int size = this->enabledFiles.size();
833 for (
int i = 0; i < size; ++i)
835 this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(sendToServer);
844 for (
int i = 0; i < size; ++i)
855 for (
int i = 0; i < size; ++i)
866 for (
int i = 0; i < size; ++i)
877 for (
int i = 0; i < size; ++i)
888 for (
int i = 0; i < size; ++i)
900 for (
size_t i = 0; i < allFields.size(); ++i)
902 CField* field = allFields[i];
904 if (field->
file && !field->
file->mode.isEmpty() && field->
file->mode == CFile::mode_attr::read)
905 field->read_access =
true;
906 else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled))
933 solveDescInheritance(apply);
939 if (hasClient && !hasServer)
942 for (
unsigned int i = 0; i < allFiles.size(); i++)
943 allFiles[i]->solveFieldRefInheritance(apply);
946 unsigned int vecSize = allGrids.size();
948 for (i = 0; i < vecSize; ++i)
949 allGrids[i]->solveDomainAxisRefInheritance(apply);
960 for (
unsigned int i = 0; i < allFiles.size(); i++)
961 if (!allFiles[i]->enabled.isEmpty())
963 if (allFiles[i]->enabled.getValue())
965 if (allFiles[i]->output_freq.isEmpty())
967 ERROR(
"CContext::findEnabledFiles()",
968 <<
"Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
971 if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->
getCalendar()->getTimeStep()))
973 error(0)<<
"WARNING: void CContext::findEnabledFiles()"<<endl
974 <<
"Output frequency in file \""<<allFiles[i]->getFileOutputName()
975 <<
"\" is less than the time step. File will not be written."<<endl;
983 if (allFiles[i]->output_freq.isEmpty())
985 ERROR(
"CContext::findEnabledFiles()",
986 <<
"Mandatory attribute output_freq must be defined for file \""<<allFiles[i]->getFileOutputName()
989 if ( (initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->
getCalendar()->getTimeStep()))
991 error(0)<<
"WARNING: void CContext::findEnabledFiles()"<<endl
992 <<
"Output frequency in file \""<<allFiles[i]->getFileOutputName()
993 <<
"\" is less than the time step. File will not be written."<<endl;
1000 DEBUG(<<
"Aucun fichier ne va être sorti dans le contexte nommé \""
1001 <<
getId() <<
"\" !");
1009 bool distFileMemory=false ;
1010 distFileMemory=CXios::getin<bool>(
"server2_dist_file_memory", distFileMemory);
1020 double eps=std::numeric_limits<double>::epsilon()*10 ;
1025 std::ofstream ofs((
"distribute_file_"+
getId()+
".dat").c_str(), std::ofstream::out);
1037 std::vector<std::pair<double, CFile*> > dataSizeMap;
1038 double dataPerPool = 0;
1041 for (
size_t i = 0; i < size; ++i)
1044 ofs<<file->
getId()<<endl ;
1047 size_t numEnabledFields = enabledFields.size();
1048 ofs<<numEnabledFields<<endl ;
1049 for (
size_t j = 0; j < numEnabledFields; ++j)
1051 dataSize += enabledFields[j]->getGlobalWrittenSize() ;
1052 ofs<<enabledFields[j]->grid->getId()<<endl ;
1053 ofs<<enabledFields[j]->getGlobalWrittenSize()<<endl ;
1056 double dataSizeSec= dataSize/ outFreqSec;
1057 ofs<<dataSizeSec<<endl ;
1060 dataSizeMap.push_back(make_pair(dataSizeSec + dataSizeSec * eps * nfield , file));
1061 dataPerPool += dataSizeSec;
1063 dataPerPool /= nbPools;
1064 std::sort(dataSizeMap.begin(), dataSizeMap.end());
1068 std::multimap<double,int> poolDataSize ;
1073 for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair<double,int>(0.,j)) ;
1075 for (
int i = dataSizeMap.size()-1; i >= 0; --i)
1077 dataSize=(*poolDataSize.begin()).first ;
1078 j=(*poolDataSize.begin()).second ;
1080 dataSize+=dataSizeMap[i].first;
1081 poolDataSize.erase(poolDataSize.begin()) ;
1082 poolDataSize.insert(std::pair<double,int>(dataSize,j)) ;
1085 for (std::multimap<double,int>:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it)
info(30)<<
"Load Balancing for servers (perfect=1) : "<<it->second<<
" : ratio "<<it->first*1./dataPerPool<<endl ;
1108 ratio=CXios::getin<double>(
"server2_dist_file_memory_ratio", ratio);
1111 vector<SDistFile> files(nFiles);
1112 vector<SDistGrid> grids;
1113 map<string,int> gridMap ;
1117 for (
size_t i = 0; i < nFiles; ++i)
1122 size_t numEnabledFields = enabledFields.size();
1124 files[i].id_=file->
getId() ;
1125 files[i].nbGrids_=numEnabledFields;
1126 files[i].assignedGrid_ =
new int[files[i].nbGrids_] ;
1128 for (
size_t j = 0; j < numEnabledFields; ++j)
1130 gridId=enabledFields[j]->grid->getId() ;
1131 if (gridMap.find(gridId)==gridMap.end())
1133 gridMap[gridId]=gridIndex ;
1135 grids.push_back(newGrid) ;
1138 files[i].assignedGrid_[j]=gridMap[gridId] ;
1139 grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ;
1140 dataSize += enabledFields[j]->getGlobalWrittenSize() ;
1143 files[i].bandwith_= dataSize/ outFreqSec ;
1149 for(
int i=0; i<nFiles; i++) bandwith+=files[i].bandwith_ ;
1150 for(
int i=0; i<nFiles; i++) files[i].bandwith_ = files[i].bandwith_/bandwith * ratio ;
1152 for(
int i=0; i<grids.size(); i++) memory+=grids[i].size_ ;
1153 for(
int i=0; i<grids.size(); i++) grids[i].size_ = grids[i].size_ / memory * (1.0-ratio) ;
1157 vector<double> memorySize(nbPools,0.) ;
1158 vector< set<int> > serverGrids(nbPools) ;
1159 vector<double> bandwithSize(nbPools,0.) ;
1161 for (
size_t i = 0; i < nFiles; ++i)
1163 bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ;
1164 for(
int j=0 ; j<files[i].nbGrids_;j++)
1166 if (serverGrids[files[i].assignedServer_].find(files[i].assignedGrid_[j]) == serverGrids[files[i].assignedServer_].end())
1168 memorySize[files[i].assignedServer_]+= grids[files[i].assignedGrid_[j]].size_ * memory / (1.0-ratio);
1169 serverGrids[files[i].assignedServer_].insert(files[i].assignedGrid_[j]) ;
1173 delete [] files[i].assignedGrid_ ;
1176 for (
int i = 0; i < nbPools; ++i)
info(100)<<
"Pool server level2 "<<i<<
" assigned file bandwith "<<bandwithSize[i]*86400.*4./1024/1024.<<
" Mb / days"<<endl ;
1177 for (
int i = 0; i < nbPools; ++i)
info(100)<<
"Pool server level2 "<<i<<
" assigned grid memory "<<memorySize[i]*100/1024./1024.<<
" Mb"<<endl ;
1201 for (
int i = 0; i < size; ++i)
1217 for (
int i = 0; i < size; ++i)
1228 std::vector<CFile*>::const_iterator
1231 for (; it != end; it++)
1233 info(30)<<
"Closing File : "<<(*it)->getId()<<endl;
1250 if (SuperClass::dispatchEvent(event))
return true;
1255 case EVENT_ID_CLOSE_DEFINITION :
1256 recvCloseDefinition(event);
1259 case EVENT_ID_UPDATE_CALENDAR:
1260 recvUpdateCalendar(event);
1263 case EVENT_ID_CREATE_FILE_HEADER :
1264 recvCreateFileHeader(event);
1267 case EVENT_ID_POST_PROCESS:
1268 recvPostProcessing(event);
1270 case EVENT_ID_SEND_REGISTRY:
1271 recvRegistry(event);
1274 case EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES:
1275 recvPostProcessingGlobalAttributes(event);
1278 case EVENT_ID_PROCESS_GRID_ENABLED_FIELDS:
1279 recvProcessingGridOfEnabledFields(event);
1283 ERROR(
"bool CContext::dispatchEvent(CEventServer& event)",
1297 for (
int i = 0; i < nbSrvPools; ++i)
1309 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1310 event.
push(*itRank,1,msg);
1313 else contextClientTmp->
sendEvent(event);
1325 get(id)->closeDefinition();
1334 int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1;
1335 for (
int i = 0; i < nbSrvPools; ++i)
1337 CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client;
1338 CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR);
1344 msg<<this->getIdServer(i)<<step;
1346 msg<<this->getIdServer()<<step;
1348 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1349 event.
push(*itRank,1,msg);
1352 else contextClientTmp->
sendEvent(event);
1364 get(id)->recvUpdateCalendar(*buffer);
1374 updateCalendar(step);
1375 if (hasClient && hasServer)
1377 sendUpdateCalendar(step);
1389 for (
int i = 0; i < nbSrvPools; ++i)
1402 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1403 event.
push(*itRank,1,msg) ;
1406 else contextClientTmp->
sendEvent(event);
1418 get(id)->recvCreateFileHeader(*buffer);
1426 if (!hasClient && hasServer)
1437 for (
int i = 0; i < nbSrvPools; ++i)
1450 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1451 event.
push(*itRank,1,msg);
1454 else contextClientTmp->
sendEvent(event);
1476 for (
int i = 0; i < nbSrvPools; ++i)
1488 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
1489 event.
push(*itRank,1,msg);
1492 else contextClientTmp->
sendEvent(event);
1504 get(id)->recvPostProcessing(*buffer);
1533 idServer_ = this->getId();
1534 idServer_ +=
"_server_";
1535 idServer_ += std::to_string(static_cast<unsigned long long>(i));
1553 ERROR(
"CContext::postProcessing()", <<
"A calendar must be defined for the context \"" <<
getId() <<
"!\"")
1555 ERROR(
"CContext::postProcessing()", <<
"A timestep must be defined for the context \"" <<
getId() <<
"!\"")
1627 std::vector<CFile*>& fileList = this->enabledFiles;
1628 size_t numEnabledFiles = fileList.size();
1629 for (
size_t i = 0; i < numEnabledFiles; ++i)
1632 CFile* file = fileList[i];
1634 size_t numEnabledFields = enabledFields.size();
1635 for (
size_t j = 0; j < numEnabledFields; ++j)
1637 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting);
1638 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
1639 for (; it != itE; ++it)
1643 if (attributesSize[it->first] < it->second)
1644 attributesSize[it->first] = it->second;
1646 if (maxEventSize[it->first] < it->second)
1647 maxEventSize[it->first] = it->second;
1651 return attributesSize;
1666 std::map<int, StdSize> dataSize;
1669 std::vector<CFile*>& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles;
1670 size_t numEnabledFiles = fileList.size();
1671 for (
size_t i = 0; i < numEnabledFiles; ++i)
1673 CFile* file = fileList[i];
1677 size_t numEnabledFields = enabledFields.size();
1678 for (
size_t j = 0; j < numEnabledFields; ++j)
1681 const std::map<int, StdSize> mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting);
1682 std::map<int, StdSize>::const_iterator it = mapSize.begin(), itE = mapSize.end();
1683 for (; it != itE; ++it)
1688 dataSize[it->first] += it->second;
1689 else if (dataSize[it->first] < it->second)
1690 dataSize[it->first] = it->second;
1692 if (maxEventSize[it->first] < it->second)
1693 maxEventSize[it->first] = it->second;
1706 int size = activeFiles.size();
1711 StdString fileDefRoot(
"file_definition");
1712 CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot);
1714 for (
int i = 0; i < size; ++i)
1716 CFile*
f = activeFiles[i];
1728 int size = activeFiles.size();
1729 for (
int i = 0; i < size; ++i)
1731 activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient());
1743 for (vector<CAxis*>::const_iterator it = allAxis.begin(); it != allAxis.end(); it++)
1744 (*it)->checkEligibilityForCompressedOutput();
1747 for (vector<CDomain*>::const_iterator it = allDomains.begin(); it != allDomains.end(); it++)
1748 (*it)->checkEligibilityForCompressedOutput();
1751 for (vector<CGrid*>::const_iterator it = allGrids.begin(); it != allGrids.end(); it++)
1752 (*it)->checkEligibilityForCompressedOutput();
1763 for (
size_t i = 0; i < allFiles.size(); i++)
1765 CFile* file = allFiles[i];
1767 std::vector<CVariable*> fileVars, fieldVars, vars = file->
getAllVariables();
1768 for (
size_t k = 0; k < vars.size(); k++)
1772 if (var->ts_target.isEmpty()
1773 || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)
1774 fileVars.push_back(var);
1776 if (!var->ts_target.isEmpty()
1777 && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both))
1778 fieldVars.push_back(var);
1781 if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none)
1784 StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ;
1787 size_t pos=tsPrefix.find(fileNameStr) ;
1788 while (pos!=std::string::npos)
1790 tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ;
1791 pos=tsPrefix.find(fileNameStr) ;
1794 const std::vector<CField*> allFields = file->
getAllFields();
1795 for (
size_t j = 0; j < allFields.size(); j++)
1797 CField* field = allFields[j];
1799 if (!field->ts_enabled.isEmpty() && field->ts_enabled)
1805 for (
size_t k = 0; k < fileVars.size(); k++)
1809 tsFile->name = tsPrefix +
"_";
1810 if (!field->name.isEmpty())
1811 tsFile->name.
get() += field->name;
1812 else if (field->hasDirectFieldReference())
1813 tsFile->name.
get() += field->field_ref;
1815 tsFile->name.
get() += field->
getId();
1817 if (!field->ts_split_freq.isEmpty())
1818 tsFile->split_freq = field->ts_split_freq;
1821 tsField->field_ref = field->
getId();
1824 for (
size_t k = 0; k < fieldVars.size(); k++)
1825 tsField->getVirtualVariableGroup()->addChild(fieldVars[k]);
1828 for (
size_t k = 0; k < vars.size(); k++)
1833 if (var->ts_target.isEmpty()
1834 || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)
1835 tsField->getVirtualVariableGroup()->addChild(var);
1838 if (!var->ts_target.isEmpty()
1839 && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both))
1845 if (file->timeseries == CFile::timeseries_attr::exclusive)
1846 field->enabled =
false;
1851 if (file->timeseries == CFile::timeseries_attr::only)
1852 file->enabled =
false;
1862 std::set<StdString> gridIds;
1863 int sizeFile = activeFiles.size();
1864 CFile* filePtr(NULL);
1867 for (
int i = 0; i < sizeFile; ++i)
1869 filePtr = activeFiles[i];
1871 int sizeField = enabledFields.size();
1872 for (
int numField = 0; numField < sizeField; ++numField)
1874 if (0 != enabledFields[numField]->getRelGrid())
1875 gridIds.insert(
CGrid::get(enabledFields[numField]->getRelGrid())->getId());
1880 StdString gridDefRoot(
"grid_definition");
1881 CGridGroup* gridPtr = CGridGroup::get(gridDefRoot);
1882 std::set<StdString>::const_iterator it, itE = gridIds.end();
1883 for (it = gridIds.begin(); it != itE; ++it)
1885 gridPtr->sendCreateChild(*it);
1898 std::set<StdString> domainIds, axisIds, scalarIds;
1901 int numEnabledFiles = activeFiles.size();
1902 for (
int i = 0; i < numEnabledFiles; ++i)
1904 std::vector<CField*> enabledFields = activeFiles[i]->getEnabledFields();
1905 int numEnabledFields = enabledFields.size();
1906 for (
int j = 0; j < numEnabledFields; ++j)
1908 const std::vector<StdString>& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds();
1909 if (
"" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]);
1910 if (
"" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]);
1911 if (
"" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]);
1916 std::set<StdString>::iterator itDom, itAxis, itScalar;
1917 std::set<StdString>::const_iterator itE;
1919 StdString scalarDefRoot(
"scalar_definition");
1920 CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot);
1921 itE = scalarIds.end();
1922 for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar)
1924 if (!itScalar->empty())
1926 scalarPtr->sendCreateChild(*itScalar);
1931 StdString axiDefRoot(
"axis_definition");
1932 CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot);
1933 itE = axisIds.end();
1934 for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis)
1936 if (!itAxis->empty())
1938 axisPtr->sendCreateChild(*itAxis);
1944 StdString domDefRoot(
"domain_definition");
1945 CDomainGroup* domPtr = CDomainGroup::get(domDefRoot);
1946 itE = domainIds.end();
1947 for (itDom = domainIds.begin(); itDom != itE; ++itDom)
1949 if (!itDom->empty()) {
1950 domPtr->sendCreateChild(*itDom);
1961 int prevStep = calendar->getStep();
1963 if (prevStep < step)
1965 if (hasClient && !hasServer)
1967 doPreTimestepOperationsForEnabledReadModeFiles();
1970 info(50) <<
"updateCalendar : before : " << calendar->getCurrentDate() << endl;
1971 calendar->update(step);
1972 info(50) <<
"updateCalendar : after : " << calendar->getCurrentDate() << endl;
1973 #ifdef XIOS_MEMTRACK_LIGHT
1974 info(50) <<
" Current memory used by XIOS : "<< MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<
" Mbyte, at timestep "<<step<<
" of context "<<this->getId()<<endl ;
1977 if (hasClient && !hasServer)
1979 doPostTimestepOperationsForEnabledReadModeFiles();
1980 garbageCollector.invalidate(calendar->getCurrentDate());
1983 else if (prevStep == step)
1984 info(50) <<
"updateCalendar: already at step " << step <<
", no operation done." << endl;
1986 ERROR(
"void CContext::updateCalendar(int step)",
1987 <<
"Illegal calendar update: previous step was " << prevStep <<
", new step " << step <<
"is in the past!")
1994 vector<CFile*>::const_iterator it;
2007 vector<CFile*>::const_iterator it;
2048 CContext* context = CObjectFactory::CreateObject<CContext>(id).
get();
2052 #define DECLARE_NODE(Name_, name_) \
2053 C##Name_##Definition::create(C##Name_##Definition::GetDefName());
2054 #define DECLARE_NODE_PAR(Name_, name_)
2068 get(id)->recvRegistry(*buffer);
2075 if (server->intraCommRank==0)
2079 registryOut->mergeRegistry(registry) ;
2091 for (
int i = 0; i < nbSrvPools; ++i)
2104 for (std::list<int>::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank)
2105 event.
push(*itRank,1,msg);
2108 else contextClientTmp->
sendEvent(event);
2127 str.append(
"enabled files=\"");
2129 for (
int i = 0; i < size; ++i)
void buildFilterGraphOfEnabledFields()
static bool isClient
Check if xios is client.
static const size_t headerSize
void solveAllRefOfFieldsWithReadAccess()
void updateCalendar(int step)
Update calendar in each time step.
CATCH CDomainAlgorithmReorder::CDomainAlgorithmReorder(CDomain *domainDestination, CDomain *domainSource, CReorderDomain *reorderDomain if)(domainDestination->type!=CDomain::type_attr::rectilinear)
void sendEvent(CEventClient &event)
In case of attached mode, the current context must be reset to context for client.
void sendAllAxis()
Send all attributes of axis from client to server.
void sendEnabledFieldsInFiles(const std::vector< CFile * > &activeFiles)
Client side: Send information of active fields (ones are written onto files)
std::vector< CFile * > enabledFiles
const StdString getFileOutputName(void) const
Accesseurs ///.
CContext(void)
Constructeurs ///.
void sendCreateFileHeader(void)
Client side: Send a message to create header part of netcdf file.
std::vector< CField * > getAllFields(void) const
Get all fields of a file.
void freeComms(void)
Free internally allcoated communicators.
const StdString & getIdServer()
std::vector< CVariable * > getAllVariables(void) const
Get all variables of a file.
void sendRefGrid(const std::vector< CFile * > &activeFiles)
Client side: Send information of reference grid of active fields.
void readAttributesOfEnabledFieldsInReadModeFiles()
void findEnabledWriteModeFiles(void)
Find all files in write mode.
void sendAllScalars()
Send all attributes of scalars from client to server.
std::list< MPI_Comm > comms
Communicators allocated internally.
CContextClient * getContextClient()
void releaseBuffers(void)
Release all buffers.
void findEnabledFiles(void)
void solveOnlyRefOfEnabledFields(bool sendToServer)
Go up the hierachical tree via field_ref and do check of attributes of fields This can be done in a c...
static void SetCurrentContextId(const StdString &context)
Mutateurs ///.
static void recvPostProcessing(CEventServer &event)
Server side: Receive a message to do some post processing.
void postProcessFilterGraph()
bool eventLoop(bool enableEventsProcessing=true)
void initServer(MPI_Comm intraComm, MPI_Comm interComm, CContext *cxtClient=0)
static bool has(const string &id)
static double bufferSizeFactor
Factor used to tune the buffer size.
static bool isOptPerformance
Check if buffer size is for performance (as large as possible)
std::vector< CContextClient * > clientPrimServer
CField * addField(const string &id="")
Add a field into file.
std::shared_ptr< CCalendar > getCalendar(void) const
Accesseurs ///.
CContextServer * server
Concrete context server.
#define UNUSED(parameter)
Macro ///.
void sendAllAttributesToServer()
void startPrefetchingOfEnabledReadModeFiles()
static T * get(const string &id)
static ENodeType GetType(void)
void postProcessing()
Do some simple post processings after parsing xml file After the xml file (iodef.xml) is parsed...
static const StdString & GetRootName(void)
Accesseurs statiques ///.
const CDuration NoneDu(0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0)
static void recvCloseDefinition(CEventServer &event)
Server side: Receive a message of client announcing a context close.
void initClient(MPI_Comm intraComm, MPI_Comm interComm, CContext *cxtServer=0)
static StdString GetName(void)
Accesseurs statiques ///.
static void recvRegistry(CEventServer &event)
Server side: Receive a message to do some post processing.
void checkGridEnabledFields()
std::vector< CField * > getEnabledFields(int default_outputlevel=5, int default_level=1, bool default_enabled=true)
Get all enabled fields of file A field is considered to be enabled if it fullfil these conditions: it...
static void recvCreateFileHeader(CEventServer &event)
Server side: Receive a message of client annoucing the creation of header part of netcdf file...
void findEnabledReadModeFiles(void)
Find all files in read mode.
void checkAxisDomainsGridsEligibilityForCompressedOutput()
Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output...
const std::list< int > & getRanksServerLeader(void) const
Get leading server in the group of connected server.
std::map< int, StdSize > getAttributesBufferSize(std::map< int, StdSize > &maxEventSize, CContextClient *contextClient, bool bufferForWriting=false)
Compute the required buffer size to send the attributes (mostly those grid related).
const StdString & getId(void) const
Accesseurs ///.
void buildFilterGraphOfFieldsWithReadAccess()
long long int Time
////////////////////// Déclarations ////////////////////// ///
static CRegistry * globalRegistry
global registry which is wrote by the root process of the servers
ifstream f(fileToReadWrite_.c_str())
static StdString GetDefName(void)
////////////////////// Déclarations ////////////////////// ///
virtual void solveDescInheritance(bool apply, const CAttributeMap *const parent=0)
Find all inheritace among objects in a context.
#define DECLARE_NODE_PAR(Name_, name_)
#define DECLARE_NODE(Name_, name_)
static void AddChild(std::shared_ptr< U > group, std::shared_ptr< typename U::RelChild > child)
static void setCurrent(const string &id)
Set context with an id be the current context.
void finalize(void)
Finalize context client and do some reports.
void distributeFileOverMemoryBandwith()
virtual ~CContext(void)
Destructeur ///.
void mergeRegistry(const CRegistry &inRegistry)
static void ShowTree(StdOStream &out=std::clog)
Show tree structure of context.
static void CleanTree(void)
void sendPostProcessing()
Client side: Send a message to do some post processing on server.
void solveAllInheritance(bool apply=true)
static void SetCurrentContextId(const StdString &context)
Mutateurs ///.
bool isFinalized(void)
Context is finalized if it received context post finalize event.
std::vector< CFile * > enabledReadModeFiles
CCalendarWrapper * get(void)
void setCalendar(std::shared_ptr< CCalendar > newCalendar)
Mutateurs ///.
vector< CVariable * > getAllVariables(void) const
virtual bool hasChild(void) const
Verify if all root definition in the context have child.
void distributeFiles(void)
This class is a registry database which store key with an associated value.
CATCH CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar *scalarDestination, CScalar *scalarSource, CReduceScalarToScalar *algo ERROR)("CScalarAlgorithmReduceScalar::CScalarAlgorithmReduceScalar(CScalar* scalarDestination, CScalar* scalarSource, CReduceScalarToScalar* algo)",<< "Operation must be defined."<< "Scalar source "<< scalarSource->getId()<< std::endl<< "Scalar destination "<< scalarDestination->getId())
static void recvUpdateCalendar(CEventServer &event)
Server side: Receive a message of client annoucing calendar update.
std::map< int, size_t > getMinimumBufferSizeForAttributes(CContextClient *client)
StdString dumpClassAttributes(void)
A context can be both on client and on server side.
static void ParseInclude(StdIStream &stream, const string &fluxId, T &object)
std::map< StdString, StdString > THashAttributes
////////////////////// Déclarations ////////////////////// ///
virtual StdString toString(void) const
Convert context object into string (to print)
////////////////////// Déclarations ////////////////////// ///
void setClientServerBuffer(CContextClient *contextClient, bool bufferForWriting=false)
Sets client buffers.
void push(int rank, int nbSender, CMessage &msg)
std::shared_ptr< CCalendar > calendar
static CContext * create(const string &id="")
Create a context with specific id.
void doPostTimestepOperationsForEnabledReadModeFiles()
void closeDefinition(void)
Close all the context defintion and do processing data After everything is well defined on client sid...
bool isInitialized(void)
Verify whether a context is initialized.
static StdSize minBufferSize
Minimum buffer size.
std::map< int, StdSize > getDataBufferSize(std::map< int, StdSize > &maxEventSize, CContextClient *contextClient, bool bufferForWriting=false)
Compute the required buffer size to send the fields data.
void distributeFileOverServer2(int nbServers, int nGrids, SDistGrid *grids, int nFiles, SDistFile *files)
int clientRank
Rank of current client.
void createCalendar(void)
Try to create the calendar from the parsed attributes.
CLog error("error", cerr.rdbuf())
std::vector< CField * > fieldsWithReadAccess
static void recvProcessingGridOfEnabledFields(CEventServer &event)
Server side: Receive a message to do some post processing.
std::vector< CContextServer * > serverPrimServer
std::vector< CFile * > enabledWriteModeFiles
void distributeFileOverBandwith()
virtual void parse(xml::CXMLNode &node)
static bool dispatchEvent(CEventServer &event)
Deallocate buffers allocated by clientContexts.
static const vector< CField * > getAll()
static bool isServer
Check if xios is server.
static CContextGroup * getRoot(void)
Get context group (context root)
bool havePendingRequests(void)
void sendEnabledFiles(const std::vector< CFile * > &activeFiles)
Client side: Send infomation of active files (files are enabled to write out)
static CFile * create(const string &id=string(""))
void solveAllRefOfEnabledFieldsAndTransform(bool sendToServer)
Go up the hierachical tree via field_ref and do check of attributes of fields.
void sendAddAllVariables(CContextClient *client)
Send messages to duplicate all variables on server side Because each variable has also its attributes...
static StdSize maxBufferSize
Maximum buffer size.
void createFileHeader(void)
Server side: Create header of netcdf file.
void sendRefDomainsAxisScalars(const std::vector< CFile * > &activeFiles)
Client side: Send information of reference domain, axis and scalar of active fields.
enum xios::_node_type ENodeType
////////////////////// Définitions ////////////////////// ///
void findAllEnabledFieldsInFiles(const std::vector< CFile * > &activeFiles)
static std::shared_ptr< CContextGroup > root
void sendCloseDefinition(void)
Client side: Send a message to server to make it close.
static CTimer & get(std::string name)
void sendProcessingGridOfEnabledFields()
after be gathered to the root process of the context, merged registry is sent to the root process of ...
void sendPostProcessingGlobalAttributes()
void checkGridEnabledFieldsInFiles(const std::vector< CFile * > &activeFiles)
Check grid of active (enabled) fields in file.
std::shared_ptr< T > getShared(void)
bool fromBuffer(CBufferIn &buffer)
CGarbageCollector garbageCollector
void sendGridEnabledFieldsInFiles(const std::vector< CFile * > &activeFiles)
Send active (enabled) fields in file from a client to others.
void sendUpdateCalendar(int step)
Client side: Send a message to update calendar in each time step.
ENodeType getType(void) const
Accesseurs ///.
void findFieldsWithReadAccess(void)
std::ostringstream StdOStringStream
Définition de types (issus de la bibliothèque standard)///.
std::ifstream StdIFStream
void duplicateAttributes(const CAttributeMap *const _parent)
Duplicate attribute map with a specific attribute map.
virtual void parse(xml::CXMLNode &node)
Parse xml file and write information into context object.
void hierarchicalGatherRegistry(void)
void sendAllDomains()
Send all attributes of domains from client to server.
void releaseClientBuffers(void)
Deallocate buffers allocated by clientContexts.
void sendGridComponentEnabledFieldsInFiles(const std::vector< CFile * > &activeFiles)
int countChildCtx_
Counter of child contexts (for now it is the number of secondary server pools)
static StdString GetDefName(void)
CRegistry * registryOut
output registry which will be written into file at the finalize
CVariableGroup * getVirtualVariableGroup(void) const
Get virtual variable group In each file, there always exists a variable group which is the ancestor o...
void solveFieldRefInheritance(bool apply)
void doPreTimestepOperationsForEnabledReadModeFiles()
static void recvPostProcessingGlobalAttributes(CEventServer &event)
bool checkBuffersAndListen(bool enableEventsProcessing=true)
Try to send the buffers and receive possible answers.
void finalize(void)
Terminate a context.
static CContext * getCurrent(void)
Get current context.
CContextClient * client
Concrete contex client.
bool isServerLeader(void) const
Check if client connects to leading server.
static StdString & GetCurrentContextId(void)
Accesseurs ///.
bool checkBuffers(list< int > &ranks)
Verify state of buffers corresponding to a connection.
void postProcessingGlobalAttributes()
void prepareTimeseries(void)
Client side: Prepare the timeseries by adding the necessary files.