#include "context.hpp" #include "attribute_template.hpp" #include "object_template.hpp" #include "group_template.hpp" #include "calendar_type.hpp" #include "duration.hpp" #include "context_client.hpp" #include "context_server.hpp" #include "nc4_data_output.hpp" #include "node_type.hpp" #include "message.hpp" #include "type.hpp" #include "xios_spl.hpp" #include "timer.hpp" #include "memtrack.hpp" #include #include #include "server.hpp" #include "distribute_file_server2.hpp" using namespace ep_lib; namespace xios { boost::shared_ptr * CContext::root_ptr = 0; /// ////////////////////// Définitions ////////////////////// /// CContext::CContext(void) : CObjectTemplate(), CContextAttributes() , calendar(), hasClient(false), hasServer(false) , isPostProcessed(false), finalized(false) , idServer_(), client(0), server(0) , allProcessed(false), countChildCtx_(0) { /* Ne rien faire de plus */ } CContext::CContext(const StdString & id) : CObjectTemplate(id), CContextAttributes() , calendar(), hasClient(false), hasServer(false) , isPostProcessed(false), finalized(false) , idServer_(), client(0), server(0) , allProcessed(false), countChildCtx_(0) { /* Ne rien faire de plus */ } CContext::~CContext(void) { delete client; delete server; for (std::vector::iterator it = clientPrimServer.begin(); it != clientPrimServer.end(); it++) delete *it; for (std::vector::iterator it = serverPrimServer.begin(); it != serverPrimServer.end(); it++) delete *it; } //---------------------------------------------------------------- //! Get name of context StdString CContext::GetName(void) { return (StdString("context")); } StdString CContext::GetDefName(void){ return (CContext::GetName()); } ENodeType CContext::GetType(void) { return (eContext); } //---------------------------------------------------------------- /*! \brief Get context group (context root) \return Context root */ CContextGroup* CContext::getRoot(void) { if(root_ptr == 0) root_ptr = new boost::shared_ptr(new CContextGroup(xml::CXMLNode::GetRootName())); return root_ptr->get(); } //---------------------------------------------------------------- /*! \brief Get calendar of a context \return Calendar */ boost::shared_ptr CContext::getCalendar(void) const { return (this->calendar); } //---------------------------------------------------------------- /*! \brief Set a context with a calendar \param[in] newCalendar new calendar */ void CContext::setCalendar(boost::shared_ptr newCalendar) { this->calendar = newCalendar; } //---------------------------------------------------------------- /*! \brief Parse xml file and write information into context object \param [in] node xmld node corresponding in xml file */ void CContext::parse(xml::CXMLNode & node) { CContext::SuperClass::parse(node); // PARSING POUR GESTION DES ENFANTS xml::THashAttributes attributes = node.getAttributes(); if (attributes.end() != attributes.find("src")) { StdIFStream ifs ( attributes["src"].c_str() , StdIFStream::in ); if ( (ifs.rdstate() & std::ifstream::failbit ) != 0 ) ERROR("void CContext::parse(xml::CXMLNode & node)", < file" ); if (!ifs.good()) ERROR("CContext::parse(xml::CXMLNode & node)", << "[ filename = " << attributes["src"] << " ] Bad xml stream !"); xml::CXMLParser::ParseInclude(ifs, attributes["src"], *this); } if (node.getElementName().compare(CContext::GetName())) DEBUG("Le noeud is wrong defined but will be considered as a context !"); if (!(node.goToChildElement())) { DEBUG("Le context ne contient pas d'enfant !"); } else { do { // Parcours des contextes pour traitement. StdString name = node.getElementName(); attributes.clear(); attributes = node.getAttributes(); if (attributes.end() != attributes.find("id")) { DEBUG(<< "Definition node has an id," << "it will not be taking account !"); } #define DECLARE_NODE(Name_, name_) \ if (name.compare(C##Name_##Definition::GetDefName()) == 0) \ { C##Name_##Definition::create(C##Name_##Definition::GetDefName()) -> parse(node); continue; } #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" DEBUG(<< "The element \'" << name << "\' in the context \'" << CContext::getCurrent()->getId() << "\' is not a definition !"); } while (node.goToNextElement()); node.goToParentElement(); // Retour au parent } } //---------------------------------------------------------------- //! Show tree structure of context void CContext::ShowTree(StdOStream & out) { StdString currentContextId = CContext::getCurrent() -> getId(); std::vector def_vector = CContext::getRoot()->getChildList(); std::vector::iterator it = def_vector.begin(), end = def_vector.end(); out << "" << std::endl; out << "<" << xml::CXMLNode::GetRootName() << " >" << std::endl; for (; it != end; it++) { CContext* context = *it; CContext::setCurrent(context->getId()); out << *context << std::endl; } out << "" << std::endl; CContext::setCurrent(currentContextId); } //---------------------------------------------------------------- //! Convert context object into string (to print) StdString CContext::toString(void) const { StdOStringStream oss; oss << "<" << CContext::GetName() << " id=\"" << this->getId() << "\" " << SuperClassAttribute::toString() << ">" << std::endl; if (!this->hasChild()) { //oss << "" << std::endl; // fait planter l'incrémentation } else { #define DECLARE_NODE(Name_, name_) \ if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \ oss << * C##Name_##Definition::get(C##Name_##Definition::GetDefName()) << std::endl; #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" } oss << ""; return (oss.str()); } //---------------------------------------------------------------- /*! \brief Find all inheritace among objects in a context. \param [in] apply (true) write attributes of parent into ones of child if they are empty (false) write attributes of parent into a new container of child \param [in] parent unused */ void CContext::solveDescInheritance(bool apply, const CAttributeMap * const UNUSED(parent)) { #define DECLARE_NODE(Name_, name_) \ if (C##Name_##Definition::has(C##Name_##Definition::GetDefName())) \ C##Name_##Definition::get(C##Name_##Definition::GetDefName())->solveDescInheritance(apply); #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" } //---------------------------------------------------------------- //! Verify if all root definition in the context have child. bool CContext::hasChild(void) const { return ( #define DECLARE_NODE(Name_, name_) \ C##Name_##Definition::has(C##Name_##Definition::GetDefName()) || #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" false); } //---------------------------------------------------------------- void CContext::CleanTree(void) { #define DECLARE_NODE(Name_, name_) C##Name_##Definition::ClearAllAttributes(); #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" } ///--------------------------------------------------------------- //! Initialize client side void CContext::initClient(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtServer /*= 0*/) { hasClient = true; ep_lib::MPI_Comm intraCommServer, interCommServer; //if (CServer::serverLevel != 1) if (CServer::serverLevel == 0) // initClient is called by client { client = new CContextClient(this, intraComm, interComm, cxtServer); if (cxtServer) // Attached mode { intraCommServer = intraComm; interCommServer = interComm; } else { ep_lib::MPI_Comm_dup(intraComm, &intraCommServer); comms.push_back(intraCommServer); ep_lib::MPI_Comm_dup(interComm, &interCommServer); comms.push_back(interCommServer); } /* for registry take the id of client context */ /* for servers, supress the _server_ from id */ string contextRegistryId=getId() ; size_t pos=contextRegistryId.find("_server_") ; if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; registryIn=new CRegistry(intraComm); registryIn->setPath(contextRegistryId) ; if (client->clientRank==0) registryIn->fromFile("xios_registry.bin") ; registryIn->bcastRegistry() ; registryOut=new CRegistry(intraComm) ; registryOut->setPath(contextRegistryId) ; server = new CContextServer(this, intraCommServer, interCommServer); } else if(CServer::serverLevel == 1) // initClient is called by primary server { clientPrimServer.push_back(new CContextClient(this, intraComm, interComm)); ep_lib::MPI_Comm_dup(intraComm, &intraCommServer); comms.push_back(intraCommServer); ep_lib::MPI_Comm_dup(interComm, &interCommServer); comms.push_back(interCommServer); serverPrimServer.push_back(new CContextServer(this, intraCommServer, interCommServer)); } } /*! Sets client buffers. \param [in] contextClient \param [in] bufferForWriting True if buffers are used for sending data for writing This flag is only true for client and server-1 for communication with server-2 */ void CContext::setClientServerBuffer(CContextClient* contextClient, bool bufferForWriting) { // Estimated minimum event size for small events (10 is an arbitrary constant just for safety) const size_t minEventSize = CEventClient::headerSize + getIdServer().size() + 10 * sizeof(int); // Ensure there is at least some room for 20 of such events in the buffers size_t minBufferSize = std::max(CXios::minBufferSize, 20 * minEventSize); #define DECLARE_NODE(Name_, name_) \ if (minBufferSize < sizeof(C##Name_##Definition)) minBufferSize = sizeof(C##Name_##Definition); #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" #undef DECLARE_NODE #undef DECLARE_NODE_PAR // Compute the buffer sizes needed to send the attributes and data corresponding to fields std::map maxEventSize; std::map bufferSize = getAttributesBufferSize(maxEventSize, contextClient, bufferForWriting); std::map dataBufferSize = getDataBufferSize(maxEventSize, contextClient, bufferForWriting); std::map::iterator it, ite = dataBufferSize.end(); for (it = dataBufferSize.begin(); it != ite; ++it) if (it->second > bufferSize[it->first]) bufferSize[it->first] = it->second; // Apply the buffer size factor, check that we are above the minimum buffer size and below the maximum size ite = bufferSize.end(); for (it = bufferSize.begin(); it != ite; ++it) { it->second *= CXios::bufferSizeFactor; if (it->second < minBufferSize) it->second = minBufferSize; if (it->second > CXios::maxBufferSize) it->second = CXios::maxBufferSize; } // Leaders will have to send some control events so ensure there is some room for those in the buffers if (contextClient->isServerLeader()) { const std::list& ranks = contextClient->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) { if (!bufferSize.count(*itRank)) { bufferSize[*itRank] = minBufferSize; maxEventSize[*itRank] = minEventSize; } } } contextClient->setBufferSize(bufferSize, maxEventSize); } //! Verify whether a context is initialized bool CContext::isInitialized(void) { return hasClient; } void CContext::initServer(ep_lib::MPI_Comm intraComm, ep_lib::MPI_Comm interComm, CContext* cxtClient /*= 0*/) { hasServer=true; server = new CContextServer(this,intraComm,interComm); /* for registry take the id of client context */ /* for servers, supress the _server_ from id */ string contextRegistryId=getId() ; size_t pos=contextRegistryId.find("_server_") ; if (pos!=std::string::npos) contextRegistryId=contextRegistryId.substr(0,pos) ; registryIn=new CRegistry(intraComm); registryIn->setPath(contextRegistryId) ; if (server->intraCommRank==0) registryIn->fromFile("xios_registry.bin") ; registryIn->bcastRegistry() ; registryOut=new CRegistry(intraComm) ; registryOut->setPath(contextRegistryId) ; ep_lib::MPI_Comm intraCommClient, interCommClient; if (cxtClient) // Attached mode { intraCommClient = intraComm; interCommClient = interComm; } else { ep_lib::MPI_Comm_dup(intraComm, &intraCommClient); comms.push_back(intraCommClient); ep_lib::MPI_Comm_dup(interComm, &interCommClient); comms.push_back(interCommClient); } client = new CContextClient(this,intraCommClient,interCommClient, cxtClient); } //! Try to send the buffers and receive possible answers bool CContext::checkBuffersAndListen(bool enableEventsProcessing /*= true*/) { bool clientReady, serverFinished; // Only classical servers are non-blocking if (CServer::serverLevel == 0) { client->checkBuffers(); bool hasTmpBufferedEvent = client->hasTemporarilyBufferedEvent(); if (hasTmpBufferedEvent) hasTmpBufferedEvent = !client->sendTemporarilyBufferedEvent(); // Don't process events if there is a temporarily buffered event return server->eventLoop(!hasTmpBufferedEvent || !enableEventsProcessing); } else if (CServer::serverLevel == 1) { if (!finalized) client->checkBuffers(); bool serverFinished = true; if (!finalized) serverFinished = server->eventLoop(enableEventsProcessing); bool serverPrimFinished = true; for (int i = 0; i < clientPrimServer.size(); ++i) { if (!finalized) clientPrimServer[i]->checkBuffers(); if (!finalized) serverPrimFinished *= serverPrimServer[i]->eventLoop(enableEventsProcessing); } return ( serverFinished && serverPrimFinished); } else if (CServer::serverLevel == 2) { client->checkBuffers(); return server->eventLoop(enableEventsProcessing); } } //! Terminate a context void CContext::finalize(void) { if (hasClient && !hasServer) // For now we only use server level 1 to read data { doPreTimestepOperationsForEnabledReadModeFiles(); } // Send registry upon calling the function the first time if (countChildCtx_ == 0) if (hasClient) sendRegistry() ; // Client: // (1) blocking send context finalize to its server // (2) blocking receive context finalize from its server // (3) some memory deallocations if (CXios::isClient) { // Make sure that client (model) enters the loop only once if (countChildCtx_ < 1) { ++countChildCtx_; client->finalize(); while (client->havePendingRequests()) client->checkBuffers(); while (!server->hasFinished()) server->eventLoop(); if (hasServer) // Mode attache { closeAllFile(); registryOut->hierarchicalGatherRegistry() ; if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; } //! Deallocate client buffers client->releaseBuffers(); //! Free internally allocated communicators for (std::list::iterator it = comms.begin(); it != comms.end(); ++it) ep_lib::MPI_Comm_free(&(*it)); comms.clear(); #pragma omp critical (_output) info(20)<<"CContext: Context <"< is finalized."<finalize(); // (Last) context finalized message received if (countChildCtx_ == clientPrimServer.size()) { // Blocking send of context finalize message to its client (e.g. primary server or model) #pragma omp critical (_output) info(100)<<"DEBUG: context "<finalize(); bool bufferReleased; do { client->checkBuffers(); bufferReleased = !client->havePendingRequests(); } while (!bufferReleased); finalized = true; closeAllFile(); // Just move to here to make sure that server-level 1 can close files if (hasServer && !hasClient) { registryOut->hierarchicalGatherRegistry() ; if (server->intraCommRank==0) CXios::globalRegistry->mergeRegistry(*registryOut) ; } //! Deallocate client buffers client->releaseBuffers(); for (int i = 0; i < clientPrimServer.size(); ++i) clientPrimServer[i]->releaseBuffers(); //! Free internally allocated communicators for (std::list::iterator it = comms.begin(); it != comms.end(); ++it) ep_lib::MPI_Comm_free(&(*it)); comms.clear(); #pragma omp critical (_output) info(20)<<"CContext: Context <"< is finalized."<::iterator it = comms.begin(); it != comms.end(); ++it) ep_lib::MPI_Comm_free(&(*it)); comms.clear(); } //! Deallocate buffers allocated by clientContexts void CContext::releaseClientBuffers(void) { client->releaseBuffers(); for (int i = 0; i < clientPrimServer.size(); ++i) clientPrimServer[i]->releaseBuffers(); } void CContext::postProcessingGlobalAttributes() { if (allProcessed) return; // After xml is parsed, there are some more works with post processing postProcessing(); // Check grid and calculate its distribution checkGridEnabledFields(); // Distribute files between secondary servers according to the data size distributeFiles(); setClientServerBuffer(client, (hasClient && !hasServer)); for (int i = 0; i < clientPrimServer.size(); ++i) setClientServerBuffer(clientPrimServer[i], true); if (hasClient) { // Send all attributes of current context to server this->sendAllAttributesToServer(); // Send all attributes of current calendar CCalendarWrapper::get(CCalendarWrapper::GetDefName())->sendAllAttributesToServer(); // We have enough information to send to server // First of all, send all enabled files sendEnabledFiles(this->enabledWriteModeFiles); // We only use server-level 1 (for now) to read data if (!hasServer) sendEnabledFiles(this->enabledReadModeFiles); // Then, send all enabled fields sendEnabledFieldsInFiles(this->enabledWriteModeFiles); if (!hasServer) sendEnabledFieldsInFiles(this->enabledReadModeFiles); // Then, check whether we have domain_ref, axis_ref or scalar_ref attached to the enabled fields // If any, so send them to server sendRefDomainsAxisScalars(this->enabledWriteModeFiles); if (!hasServer) sendRefDomainsAxisScalars(this->enabledReadModeFiles); // Check whether enabled fields have grid_ref, if any, send this info to server sendRefGrid(this->enabledFiles); // This code may be useful in the future when we want to seperate completely read and write // sendRefGrid(this->enabledWriteModeFiles); // if (!hasServer) // sendRefGrid(this->enabledReadModeFiles); // A grid of enabled fields composed of several components which must be checked then their // checked attributes should be sent to server sendGridComponentEnabledFieldsInFiles(this->enabledFiles); // This code can be seperated in two (one for reading, another for writing) // We have a xml tree on the server side and now, it should be also processed sendPostProcessing(); // Finally, we send information of grid itself to server sendGridEnabledFieldsInFiles(this->enabledWriteModeFiles); if (!hasServer) sendGridEnabledFieldsInFiles(this->enabledReadModeFiles); } allProcessed = true; } void CContext::sendPostProcessingGlobalAttributes() { // Use correct context client to send message // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client; CEventClient event(getType(),EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES); if (contextClientTmp->isServerLeader()) { CMessage msg; if (hasServer) msg<getIdServer(i); else msg<getIdServer(); const std::list& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg); contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } void CContext::recvPostProcessingGlobalAttributes(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; get(id)->recvPostProcessingGlobalAttributes(*buffer); } void CContext::recvPostProcessingGlobalAttributes(CBufferIn& buffer) { postProcessingGlobalAttributes(); } /*! \brief Close all the context defintion and do processing data After everything is well defined on client side, they will be processed and sent to server From the version 2.0, sever and client work no more on the same database. Moreover, client(s) will send all necessary information to server, from which each server can build its own database. Because the role of server is to write out field data on a specific netcdf file, the only information that it needs is the enabled files and the active fields (fields will be written onto active files) */ void CContext::closeDefinition(void) { CTimer::get("Context : close definition").resume() ; postProcessingGlobalAttributes(); if (hasClient) sendPostProcessingGlobalAttributes(); // There are some processings that should be done after all of above. For example: check mask or index this->buildFilterGraphOfEnabledFields(); if (hasClient && !hasServer) { buildFilterGraphOfFieldsWithReadAccess(); postProcessFilterGraph(); } checkGridEnabledFields(); if (hasClient) this->sendProcessingGridOfEnabledFields(); if (hasClient) this->sendCloseDefinition(); // Nettoyage de l'arborescence if (hasClient) CleanTree(); // Only on client side?? if (hasClient) { sendCreateFileHeader(); if (!hasServer) startPrefetchingOfEnabledReadModeFiles(); } CTimer::get("Context : close definition").suspend() ; } void CContext::findAllEnabledFieldsInFiles(const std::vector& activeFiles) { for (unsigned int i = 0; i < activeFiles.size(); i++) (void)activeFiles[i]->getEnabledFields(); } void CContext::readAttributesOfEnabledFieldsInReadModeFiles() { for (unsigned int i = 0; i < this->enabledReadModeFiles.size(); ++i) (void)this->enabledReadModeFiles[i]->readAttributesOfEnabledFieldsInReadMode(); } void CContext::sendGridComponentEnabledFieldsInFiles(const std::vector& activeFiles) { int size = activeFiles.size(); for (int i = 0; i < size; ++i) { activeFiles[i]->sendGridComponentOfEnabledFields(); } } /*! Send active (enabled) fields in file from a client to others \param [in] activeFiles files contains enabled fields to send */ void CContext::sendGridEnabledFieldsInFiles(const std::vector& activeFiles) { int size = activeFiles.size(); for (int i = 0; i < size; ++i) { activeFiles[i]->sendGridOfEnabledFields(); } } void CContext::checkGridEnabledFields() { int size = enabledFiles.size(); for (int i = 0; i < size; ++i) { enabledFiles[i]->checkGridOfEnabledFields(); } } /*! Check grid of active (enabled) fields in file \param [in] activeFiles files contains enabled fields whose grid needs checking */ void CContext::checkGridEnabledFieldsInFiles(const std::vector& activeFiles) { int size = activeFiles.size(); for (int i = 0; i < size; ++i) { activeFiles[i]->checkGridOfEnabledFields(); } } /*! Go up the hierachical tree via field_ref and do check of attributes of fields This can be done in a client then all computed information will be sent from this client to others \param [in] sendToServer Flag to indicate whether calculated information will be sent */ void CContext::solveOnlyRefOfEnabledFields(bool sendToServer) { int size = this->enabledFiles.size(); for (int i = 0; i < size; ++i) { this->enabledFiles[i]->solveOnlyRefOfEnabledFields(sendToServer); } for (int i = 0; i < size; ++i) { this->enabledFiles[i]->generateNewTransformationGridDest(); } } /*! Go up the hierachical tree via field_ref and do check of attributes of fields. The transformation can be done in this step. All computed information will be sent from this client to others. \param [in] sendToServer Flag to indicate whether calculated information will be sent */ void CContext::solveAllRefOfEnabledFieldsAndTransform(bool sendToServer) { int size = this->enabledFiles.size(); for (int i = 0; i < size; ++i) { this->enabledFiles[i]->solveAllRefOfEnabledFieldsAndTransform(sendToServer); } } void CContext::buildFilterGraphOfEnabledFields() { int size = this->enabledFiles.size(); for (int i = 0; i < size; ++i) { this->enabledFiles[i]->buildFilterGraphOfEnabledFields(garbageCollector); } } void CContext::postProcessFilterGraph() { int size = enabledFiles.size(); for (int i = 0; i < size; ++i) { enabledFiles[i]->postProcessFilterGraph(); } } void CContext::startPrefetchingOfEnabledReadModeFiles() { int size = enabledReadModeFiles.size(); for (int i = 0; i < size; ++i) { enabledReadModeFiles[i]->prefetchEnabledReadModeFields(); } } void CContext::doPreTimestepOperationsForEnabledReadModeFiles() { int size = enabledReadModeFiles.size(); for (int i = 0; i < size; ++i) { enabledReadModeFiles[i]->doPreTimestepOperationsForEnabledReadModeFields(); } } void CContext::doPostTimestepOperationsForEnabledReadModeFiles() { int size = enabledReadModeFiles.size(); for (int i = 0; i < size; ++i) { enabledReadModeFiles[i]->doPostTimestepOperationsForEnabledReadModeFields(); } } void CContext::findFieldsWithReadAccess(void) { fieldsWithReadAccess.clear(); const vector allFields = CField::getAll(); for (size_t i = 0; i < allFields.size(); ++i) { CField* field = allFields[i]; if (field->file && !field->file->mode.isEmpty() && field->file->mode == CFile::mode_attr::read) field->read_access = true; else if (!field->read_access.isEmpty() && field->read_access && (field->enabled.isEmpty() || field->enabled)) fieldsWithReadAccess.push_back(field); } } void CContext::solveAllRefOfFieldsWithReadAccess() { for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i) fieldsWithReadAccess[i]->solveAllReferenceEnabledField(false); } void CContext::buildFilterGraphOfFieldsWithReadAccess() { for (size_t i = 0; i < fieldsWithReadAccess.size(); ++i) fieldsWithReadAccess[i]->buildFilterGraph(garbageCollector, true); } void CContext::solveAllInheritance(bool apply) { // Résolution des héritages descendants (càd des héritages de groupes) // pour chacun des contextes. solveDescInheritance(apply); // Résolution des héritages par référence au niveau des fichiers. const vector allFiles=CFile::getAll(); const vector allGrids= CGrid::getAll(); if (hasClient && !hasServer) //if (hasClient) { for (unsigned int i = 0; i < allFiles.size(); i++) allFiles[i]->solveFieldRefInheritance(apply); } unsigned int vecSize = allGrids.size(); unsigned int i = 0; for (i = 0; i < vecSize; ++i) allGrids[i]->solveDomainAxisRefInheritance(apply); } void CContext::findEnabledFiles(void) { const std::vector allFiles = CFile::getAll(); const CDate& initDate = calendar->getInitDate(); for (unsigned int i = 0; i < allFiles.size(); i++) if (!allFiles[i]->enabled.isEmpty()) // Si l'attribut 'enabled' est défini. { if (allFiles[i]->enabled.getValue()) // Si l'attribut 'enabled' est fixé à vrai. { if ((initDate + allFiles[i]->output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep())) { error(0)<<"WARNING: void CContext::findEnabledFiles()"<getFileOutputName() <<"\" is less than the time step. File will not be written."<output_freq.getValue()) < (initDate + this->getCalendar()->getTimeStep())) { error(0)<<"WARNING: void CContext::findEnabledFiles()"<getFileOutputName() <<"\" is less than the time step. File will not be written."<("server2_dist_file_memory", distFileMemory); if (distFileMemory) distributeFileOverMemoryBandwith() ; else distributeFileOverBandwith() ; } void CContext::distributeFileOverBandwith(void) { double eps=std::numeric_limits::epsilon()*10 ; // If primary server if (hasServer && hasClient) { std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); int nbPools = clientPrimServer.size(); // (1) Find all enabled files in write mode // for (int i = 0; i < this->enabledFiles.size(); ++i) // { // if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) // enabledWriteModeFiles.push_back(enabledFiles[i]); // } // (2) Estimate the data volume for each file int size = this->enabledWriteModeFiles.size(); std::vector > dataSizeMap; double dataPerPool = 0; int nfield=0 ; ofs<enabledWriteModeFiles[i]; ofs<getId()< enabledFields = file->getEnabledFields(); size_t numEnabledFields = enabledFields.size(); ofs<getGlobalWrittenSize() ; ofs<grid->getId()<getGlobalWrittenSize()<getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; double dataSizeSec= dataSize/ outFreqSec; ofs< poolDataSize ; // multimap is not garanty to preserve stable sorting in c++98 but it seems it does for c++11 int j; double dataSize ; for (j = 0 ; j < nbPools ; ++j) poolDataSize.insert(std::pair(0.,j)) ; for (int i = dataSizeMap.size()-1; i >= 0; --i) { dataSize=(*poolDataSize.begin()).first ; j=(*poolDataSize.begin()).second ; dataSizeMap[i].second->setContextClient(clientPrimServer[j]); dataSize+=dataSizeMap[i].first; poolDataSize.erase(poolDataSize.begin()) ; poolDataSize.insert(std::pair(dataSize,j)) ; } for (std::multimap:: iterator it=poolDataSize.begin() ; it!=poolDataSize.end(); ++it) { #pragma omp critical (_output) info(30)<<"Load Balancing for servers (perfect=1) : "<second<<" : ratio "<first*1./dataPerPool<enabledReadModeFiles.size(); ++i) { enabledReadModeFiles[i]->setContextClient(client); } } else { for (int i = 0; i < this->enabledFiles.size(); ++i) enabledFiles[i]->setContextClient(client); } } void CContext::distributeFileOverMemoryBandwith(void) { // If primary server if (hasServer && hasClient) { int nbPools = clientPrimServer.size(); double ratio=0.5 ; ratio=CXios::getin("server2_dist_file_memory_ratio", ratio); int nFiles = this->enabledWriteModeFiles.size(); vector files(nFiles); vector grids; map gridMap ; string gridId; int gridIndex=0 ; for (size_t i = 0; i < nFiles; ++i) { StdSize dataSize=0; CFile* file = this->enabledWriteModeFiles[i]; std::vector enabledFields = file->getEnabledFields(); size_t numEnabledFields = enabledFields.size(); files[i].id_=file->getId() ; files[i].nbGrids_=numEnabledFields; files[i].assignedGrid_ = new int[files[i].nbGrids_] ; for (size_t j = 0; j < numEnabledFields; ++j) { gridId=enabledFields[j]->grid->getId() ; if (gridMap.find(gridId)==gridMap.end()) { gridMap[gridId]=gridIndex ; SDistGrid newGrid; grids.push_back(newGrid) ; gridIndex++ ; } files[i].assignedGrid_[j]=gridMap[gridId] ; grids[files[i].assignedGrid_[j]].size_=enabledFields[j]->getGlobalWrittenSize() ; dataSize += enabledFields[j]->getGlobalWrittenSize() ; // usefull } double outFreqSec = (Time)(calendar->getCurrentDate()+file->output_freq)-(Time)(calendar->getCurrentDate()) ; files[i].bandwith_= dataSize/ outFreqSec ; } double bandwith=0 ; double memory=0 ; for(int i=0; i memorySize(nbPools,0.) ; vector< set > serverGrids(nbPools) ; vector bandwithSize(nbPools,0.) ; for (size_t i = 0; i < nFiles; ++i) { bandwithSize[files[i].assignedServer_] += files[i].bandwith_* bandwith /ratio ; for(int j=0 ; jsetContextClient(clientPrimServer[files[i].assignedServer_]) ; delete [] files[i].assignedGrid_ ; } for (int i = 0; i < nbPools; ++i) { #pragma omp critical (_output) info(100)<<"Pool server level2 "<enabledReadModeFiles.size(); ++i) { enabledReadModeFiles[i]->setContextClient(client); } } else { for (int i = 0; i < this->enabledFiles.size(); ++i) enabledFiles[i]->setContextClient(client); } } /*! Find all files in write mode */ void CContext::findEnabledWriteModeFiles(void) { int size = this->enabledFiles.size(); for (int i = 0; i < size; ++i) { if (enabledFiles[i]->mode.isEmpty() || (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::write )) enabledWriteModeFiles.push_back(enabledFiles[i]); } } /*! Find all files in read mode */ void CContext::findEnabledReadModeFiles(void) { int size = this->enabledFiles.size(); for (int i = 0; i < size; ++i) { if (!enabledFiles[i]->mode.isEmpty() && enabledFiles[i]->mode.getValue() == CFile::mode_attr::read) enabledReadModeFiles.push_back(enabledFiles[i]); } } void CContext::closeAllFile(void) { std::vector::const_iterator it = this->enabledFiles.begin(), end = this->enabledFiles.end(); for (; it != end; it++) { #pragma omp critical (_output) info(30)<<"Closing File : "<<(*it)->getId()<close(); } } /*! \brief Dispatch event received from client Whenever a message is received in buffer of server, it will be processed depending on its event type. A new event type should be added in the switch list to make sure it processed on server side. \param [in] event: Received message */ bool CContext::dispatchEvent(CEventServer& event) { if (SuperClass::dispatchEvent(event)) return true; else { switch(event.type) { case EVENT_ID_CLOSE_DEFINITION : recvCloseDefinition(event); return true; break; case EVENT_ID_UPDATE_CALENDAR: recvUpdateCalendar(event); return true; break; case EVENT_ID_CREATE_FILE_HEADER : recvCreateFileHeader(event); return true; break; case EVENT_ID_POST_PROCESS: recvPostProcessing(event); return true; case EVENT_ID_SEND_REGISTRY: recvRegistry(event); return true; break; case EVENT_ID_POST_PROCESS_GLOBAL_ATTRIBUTES: recvPostProcessingGlobalAttributes(event); return true; break; case EVENT_ID_PROCESS_GRID_ENABLED_FIELDS: recvProcessingGridOfEnabledFields(event); return true; break; default : ERROR("bool CContext::dispatchEvent(CEventServer& event)", <<"Unknown Event"); return false; } } } //! Client side: Send a message to server to make it close void CContext::sendCloseDefinition(void) { // Use correct context client to send message int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; CEventClient event(getType(),EVENT_ID_CLOSE_DEFINITION); if (contextClientTmp->isServerLeader()) { CMessage msg; if (hasServer) msg<getIdServer(i); else msg<getIdServer(); const std::list& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg); contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } //! Server side: Receive a message of client announcing a context close void CContext::recvCloseDefinition(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; get(id)->closeDefinition(); } //! Client side: Send a message to update calendar in each time step void CContext::sendUpdateCalendar(int step) { // Use correct context client to send message int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; CEventClient event(getType(),EVENT_ID_UPDATE_CALENDAR); if (contextClientTmp->isServerLeader()) { CMessage msg; if (hasServer) msg<getIdServer(i)<getIdServer()<& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg); contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } //! Server side: Receive a message of client annoucing calendar update void CContext::recvUpdateCalendar(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; get(id)->recvUpdateCalendar(*buffer); } //! Server side: Receive a message of client annoucing calendar update void CContext::recvUpdateCalendar(CBufferIn& buffer) { int step; buffer>>step; updateCalendar(step); if (hasClient && hasServer) { sendUpdateCalendar(step); } } //! Client side: Send a message to create header part of netcdf file void CContext::sendCreateFileHeader(void) { // Use correct context client to send message // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; CEventClient event(getType(),EVENT_ID_CREATE_FILE_HEADER); if (contextClientTmp->isServerLeader()) { CMessage msg; if (hasServer) msg<getIdServer(i); else msg<getIdServer(); const std::list& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg) ; contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } //! Server side: Receive a message of client annoucing the creation of header part of netcdf file void CContext::recvCreateFileHeader(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; get(id)->recvCreateFileHeader(*buffer); } //! Server side: Receive a message of client annoucing the creation of header part of netcdf file void CContext::recvCreateFileHeader(CBufferIn& buffer) { if (!hasClient && hasServer) createFileHeader(); } //! Client side: Send a message to do some post processing on server void CContext::sendProcessingGridOfEnabledFields() { // Use correct context client to send message int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (0 != clientPrimServer.size()) ? clientPrimServer[i] : client; CEventClient event(getType(),EVENT_ID_PROCESS_GRID_ENABLED_FIELDS); if (contextClientTmp->isServerLeader()) { CMessage msg; if (hasServer) msg<getIdServer(i); else msg<getIdServer(); const std::list& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg); contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } //! Server side: Receive a message to do some post processing void CContext::recvProcessingGridOfEnabledFields(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; } //! Client side: Send a message to do some post processing on server void CContext::sendPostProcessing() { // Use correct context client to send message // int nbSrvPools = (hasServer) ? clientPrimServer.size() : 1; int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; CEventClient event(getType(),EVENT_ID_POST_PROCESS); if (contextClientTmp->isServerLeader()) { CMessage msg; if (hasServer) msg<getIdServer(i); else msg<getIdServer(); const std::list& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg); contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } //! Server side: Receive a message to do some post processing void CContext::recvPostProcessing(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; get(id)->recvPostProcessing(*buffer); } //! Server side: Receive a message to do some post processing void CContext::recvPostProcessing(CBufferIn& buffer) { CCalendarWrapper::get(CCalendarWrapper::GetDefName())->createCalendar(); postProcessing(); } const StdString& CContext::getIdServer() { if (hasClient) { idServer_ = this->getId(); idServer_ += "_server"; return idServer_; } if (hasServer) return (this->getId()); } const StdString& CContext::getIdServer(const int i) { idServer_ = this->getId(); idServer_ += "_server_"; idServer_ += boost::lexical_cast(i); return idServer_; } /*! \brief Do some simple post processings after parsing xml file After the xml file (iodef.xml) is parsed, it is necessary to build all relations among created object, e.g: inhertance among fields, domain, axis. After that, all fiels as well as their parents (reference fields), which will be written out into netcdf files, are processed */ void CContext::postProcessing() { if (isPostProcessed) return; // Make sure the calendar was correctly created if (!calendar) ERROR("CContext::postProcessing()", << "A calendar must be defined for the context \"" << getId() << "!\"") else if (calendar->getTimeStep() == NoneDu) ERROR("CContext::postProcessing()", << "A timestep must be defined for the context \"" << getId() << "!\"") // Calendar first update to set the current date equals to the start date calendar->update(0); // Find all inheritance in xml structure this->solveAllInheritance(); // ShowTree(info(10)); // Check if some axis, domains or grids are eligible to for compressed indexed output. // Warning: This must be done after solving the inheritance and before the rest of post-processing checkAxisDomainsGridsEligibilityForCompressedOutput(); // Check if some automatic time series should be generated // Warning: This must be done after solving the inheritance and before the rest of post-processing // The timeseries should only be prepared in client if (hasClient && !hasServer) prepareTimeseries(); //Initialisation du vecteur 'enabledFiles' contenant la liste des fichiers à sortir. findEnabledFiles(); findEnabledWriteModeFiles(); findEnabledReadModeFiles(); // For now, only read files with client and only one level server // if (hasClient && !hasServer) findEnabledReadModeFiles(); // Find all enabled fields of each file findAllEnabledFieldsInFiles(this->enabledWriteModeFiles); findAllEnabledFieldsInFiles(this->enabledReadModeFiles); // For now, only read files with client and only one level server // if (hasClient && !hasServer) // findAllEnabledFieldsInFiles(this->enabledReadModeFiles); if (hasClient && !hasServer) { initReadFiles(); // Try to read attributes of fields in file then fill in corresponding grid (or domain, axis) this->readAttributesOfEnabledFieldsInReadModeFiles(); } // Only search and rebuild all reference objects of enable fields, don't transform this->solveOnlyRefOfEnabledFields(false); // Search and rebuild all reference object of enabled fields, and transform this->solveAllRefOfEnabledFieldsAndTransform(false); // Find all fields with read access from the public API if (hasClient && !hasServer) findFieldsWithReadAccess(); // and solve the all reference for them if (hasClient && !hasServer) solveAllRefOfFieldsWithReadAccess(); isPostProcessed = true; } /*! * Compute the required buffer size to send the attributes (mostly those grid related). * \param maxEventSize [in/out] the size of the bigger event for each connected server * \param [in] contextClient * \param [in] bufferForWriting True if buffers are used for sending data for writing This flag is only true for client and server-1 for communication with server-2 */ std::map CContext::getAttributesBufferSize(std::map& maxEventSize, CContextClient* contextClient, bool bufferForWriting /*= "false"*/) { // As calendar attributes are sent even if there are no active files or fields, maps are initialized according the size of calendar attributes std::map attributesSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); maxEventSize = CCalendarWrapper::get(CCalendarWrapper::GetDefName())->getMinimumBufferSizeForAttributes(contextClient); std::vector& fileList = this->enabledFiles; size_t numEnabledFiles = fileList.size(); for (size_t i = 0; i < numEnabledFiles; ++i) { // CFile* file = this->enabledWriteModeFiles[i]; CFile* file = fileList[i]; std::vector enabledFields = file->getEnabledFields(); size_t numEnabledFields = enabledFields.size(); for (size_t j = 0; j < numEnabledFields; ++j) { const std::map mapSize = enabledFields[j]->getGridAttributesBufferSize(contextClient, bufferForWriting); std::map::const_iterator it = mapSize.begin(), itE = mapSize.end(); for (; it != itE; ++it) { // If attributesSize[it->first] does not exist, it will be zero-initialized // so we can use it safely without checking for its existence if (attributesSize[it->first] < it->second) attributesSize[it->first] = it->second; if (maxEventSize[it->first] < it->second) maxEventSize[it->first] = it->second; } } } return attributesSize; } /*! * Compute the required buffer size to send the fields data. * \param maxEventSize [in/out] the size of the bigger event for each connected server * \param [in] contextClient * \param [in] bufferForWriting True if buffers are used for sending data for writing This flag is only true for client and server-1 for communication with server-2 */ std::map CContext::getDataBufferSize(std::map& maxEventSize, CContextClient* contextClient, bool bufferForWriting /*= "false"*/) { std::map dataSize; // Find all reference domain and axis of all active fields std::vector& fileList = bufferForWriting ? this->enabledWriteModeFiles : this->enabledReadModeFiles; size_t numEnabledFiles = fileList.size(); for (size_t i = 0; i < numEnabledFiles; ++i) { // CFile* file = this->enabledFiles[i]; CFile* file = fileList[i]; if (file->getContextClient() == contextClient) { std::vector enabledFields = file->getEnabledFields(); size_t numEnabledFields = enabledFields.size(); for (size_t j = 0; j < numEnabledFields; ++j) { // const std::vector > mapSize = enabledFields[j]->getGridDataBufferSize(contextClient); const std::map mapSize = enabledFields[j]->getGridDataBufferSize(contextClient,bufferForWriting); std::map::const_iterator it = mapSize.begin(), itE = mapSize.end(); for (; it != itE; ++it) { // If dataSize[it->first] does not exist, it will be zero-initialized // so we can use it safely without checking for its existance if (CXios::isOptPerformance) dataSize[it->first] += it->second; else if (dataSize[it->first] < it->second) dataSize[it->first] = it->second; if (maxEventSize[it->first] < it->second) maxEventSize[it->first] = it->second; } } } } return dataSize; } //! Client side: Send infomation of active files (files are enabled to write out) void CContext::sendEnabledFiles(const std::vector& activeFiles) { int size = activeFiles.size(); // In a context, each type has a root definition, e.g: axis, domain, field. // Every object must be a child of one of these root definition. In this case // all new file objects created on server must be children of the root "file_definition" StdString fileDefRoot("file_definition"); CFileGroup* cfgrpPtr = CFileGroup::get(fileDefRoot); for (int i = 0; i < size; ++i) { CFile* f = activeFiles[i]; cfgrpPtr->sendCreateChild(f->getId(),f->getContextClient()); f->sendAllAttributesToServer(f->getContextClient()); f->sendAddAllVariables(f->getContextClient()); } } //! Client side: Send information of active fields (ones are written onto files) void CContext::sendEnabledFieldsInFiles(const std::vector& activeFiles) { int size = activeFiles.size(); for (int i = 0; i < size; ++i) { activeFiles[i]->sendEnabledFields(activeFiles[i]->getContextClient()); } } //! Client side: Check if the defined axis, domains and grids are eligible for compressed indexed output void CContext::checkAxisDomainsGridsEligibilityForCompressedOutput() { if (!hasClient) return; const vector allAxis = CAxis::getAll(); for (vector::const_iterator it = allAxis.begin(); it != allAxis.end(); it++) (*it)->checkEligibilityForCompressedOutput(); const vector allDomains = CDomain::getAll(); for (vector::const_iterator it = allDomains.begin(); it != allDomains.end(); it++) (*it)->checkEligibilityForCompressedOutput(); const vector allGrids = CGrid::getAll(); for (vector::const_iterator it = allGrids.begin(); it != allGrids.end(); it++) (*it)->checkEligibilityForCompressedOutput(); } //! Client side: Prepare the timeseries by adding the necessary files void CContext::prepareTimeseries() { if (!hasClient) return; const std::vector allFiles = CFile::getAll(); for (size_t i = 0; i < allFiles.size(); i++) { CFile* file = allFiles[i]; std::vector fileVars, fieldVars, vars = file->getAllVariables(); for (size_t k = 0; k < vars.size(); k++) { CVariable* var = vars[k]; if (var->ts_target.isEmpty() || var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both) fileVars.push_back(var); if (!var->ts_target.isEmpty() && (var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both)) fieldVars.push_back(var); } if (!file->timeseries.isEmpty() && file->timeseries != CFile::timeseries_attr::none) { StdString fileNameStr("%file_name%") ; StdString tsPrefix = !file->ts_prefix.isEmpty() ? file->ts_prefix : fileNameStr ; StdString fileName=file->getFileOutputName(); size_t pos=tsPrefix.find(fileNameStr) ; while (pos!=std::string::npos) { tsPrefix=tsPrefix.replace(pos,fileNameStr.size(),fileName) ; pos=tsPrefix.find(fileNameStr) ; } const std::vector allFields = file->getAllFields(); for (size_t j = 0; j < allFields.size(); j++) { CField* field = allFields[j]; if (!field->ts_enabled.isEmpty() && field->ts_enabled) { CFile* tsFile = CFile::create(); tsFile->duplicateAttributes(file); // Add variables originating from file and targeted to timeserie file for (size_t k = 0; k < fileVars.size(); k++) tsFile->getVirtualVariableGroup()->addChild(fileVars[k]); tsFile->name = tsPrefix + "_"; if (!field->name.isEmpty()) tsFile->name.get() += field->name; else if (field->hasDirectFieldReference()) // We cannot use getBaseFieldReference() just yet tsFile->name.get() += field->field_ref; else tsFile->name.get() += field->getId(); if (!field->ts_split_freq.isEmpty()) tsFile->split_freq = field->ts_split_freq; CField* tsField = tsFile->addField(); tsField->field_ref = field->getId(); // Add variables originating from file and targeted to timeserie field for (size_t k = 0; k < fieldVars.size(); k++) tsField->getVirtualVariableGroup()->addChild(fieldVars[k]); vars = field->getAllVariables(); for (size_t k = 0; k < vars.size(); k++) { CVariable* var = vars[k]; // Add variables originating from field and targeted to timeserie field if (var->ts_target.isEmpty() || var->ts_target == CVariable::ts_target_attr::field || var->ts_target == CVariable::ts_target_attr::both) tsField->getVirtualVariableGroup()->addChild(var); // Add variables originating from field and targeted to timeserie file if (!var->ts_target.isEmpty() && (var->ts_target == CVariable::ts_target_attr::file || var->ts_target == CVariable::ts_target_attr::both)) tsFile->getVirtualVariableGroup()->addChild(var); } tsFile->solveFieldRefInheritance(true); if (file->timeseries == CFile::timeseries_attr::exclusive) field->enabled = false; } } // Finally disable the original file is need be if (file->timeseries == CFile::timeseries_attr::only) file->enabled = false; } } } //! Client side: Send information of reference grid of active fields void CContext::sendRefGrid(const std::vector& activeFiles) { std::set gridIds; int sizeFile = activeFiles.size(); CFile* filePtr(NULL); // Firstly, find all reference grids of all active fields for (int i = 0; i < sizeFile; ++i) { filePtr = activeFiles[i]; std::vector enabledFields = filePtr->getEnabledFields(); int sizeField = enabledFields.size(); for (int numField = 0; numField < sizeField; ++numField) { if (0 != enabledFields[numField]->getRelGrid()) gridIds.insert(CGrid::get(enabledFields[numField]->getRelGrid())->getId()); } } // Create all reference grids on server side StdString gridDefRoot("grid_definition"); CGridGroup* gridPtr = CGridGroup::get(gridDefRoot); std::set::const_iterator it, itE = gridIds.end(); for (it = gridIds.begin(); it != itE; ++it) { gridPtr->sendCreateChild(*it); CGrid::get(*it)->sendAllAttributesToServer(); CGrid::get(*it)->sendAllDomains(); CGrid::get(*it)->sendAllAxis(); CGrid::get(*it)->sendAllScalars(); } } //! Client side: Send information of reference domain, axis and scalar of active fields void CContext::sendRefDomainsAxisScalars(const std::vector& activeFiles) { std::set domainIds, axisIds, scalarIds; // Find all reference domain and axis of all active fields int numEnabledFiles = activeFiles.size(); for (int i = 0; i < numEnabledFiles; ++i) { std::vector enabledFields = activeFiles[i]->getEnabledFields(); int numEnabledFields = enabledFields.size(); for (int j = 0; j < numEnabledFields; ++j) { const std::vector& prDomAxisScalarId = enabledFields[j]->getRefDomainAxisIds(); if ("" != prDomAxisScalarId[0]) domainIds.insert(prDomAxisScalarId[0]); if ("" != prDomAxisScalarId[1]) axisIds.insert(prDomAxisScalarId[1]); if ("" != prDomAxisScalarId[2]) scalarIds.insert(prDomAxisScalarId[2]); } } // Create all reference axis on server side std::set::iterator itDom, itAxis, itScalar; std::set::const_iterator itE; StdString scalarDefRoot("scalar_definition"); CScalarGroup* scalarPtr = CScalarGroup::get(scalarDefRoot); itE = scalarIds.end(); for (itScalar = scalarIds.begin(); itScalar != itE; ++itScalar) { if (!itScalar->empty()) { scalarPtr->sendCreateChild(*itScalar); CScalar::get(*itScalar)->sendAllAttributesToServer(); } } StdString axiDefRoot("axis_definition"); CAxisGroup* axisPtr = CAxisGroup::get(axiDefRoot); itE = axisIds.end(); for (itAxis = axisIds.begin(); itAxis != itE; ++itAxis) { if (!itAxis->empty()) { axisPtr->sendCreateChild(*itAxis); CAxis::get(*itAxis)->sendAllAttributesToServer(); } } // Create all reference domains on server side StdString domDefRoot("domain_definition"); CDomainGroup* domPtr = CDomainGroup::get(domDefRoot); itE = domainIds.end(); for (itDom = domainIds.begin(); itDom != itE; ++itDom) { if (!itDom->empty()) { domPtr->sendCreateChild(*itDom); CDomain::get(*itDom)->sendAllAttributesToServer(); } } } //! Update calendar in each time step void CContext::updateCalendar(int step) { int prevStep = calendar->getStep(); if (prevStep < step) { if (hasClient && !hasServer) // For now we only use server level 1 to read data { doPreTimestepOperationsForEnabledReadModeFiles(); } #pragma omp critical (_output) info(50) << "updateCalendar : before : " << calendar->getCurrentDate() << endl; calendar->update(step); #pragma omp critical (_output) info(50) << "updateCalendar : after : " << calendar->getCurrentDate() << endl; #ifdef XIOS_MEMTRACK_LIGHT #pragma omp critical (_output) info(50) << " Current memory used by XIOS : "<< MemTrack::getCurrentMemorySize()*1.0/(1024*1024)<<" Mbyte, at timestep "<getId()<getCurrentDate()); } } else if (prevStep == step) { #pragma omp critical (_output) info(50) << "updateCalendar: already at step " << step << ", no operation done." << endl; } else // if (prevStep > step) ERROR("void CContext::updateCalendar(int step)", << "Illegal calendar update: previous step was " << prevStep << ", new step " << step << "is in the past!") } void CContext::initReadFiles(void) { vector::const_iterator it; for (it=enabledReadModeFiles.begin(); it != enabledReadModeFiles.end(); it++) { (*it)->initRead(); } } //! Server side: Create header of netcdf file void CContext::createFileHeader(void) { vector::const_iterator it; for (it=enabledFiles.begin(); it != enabledFiles.end(); it++) // for (it=enabledWriteModeFiles.begin(); it != enabledWriteModeFiles.end(); it++) { (*it)->initWrite(); } } //! Get current context CContext* CContext::getCurrent(void) { return CObjectFactory::GetObject(CObjectFactory::GetCurrentContextId()).get(); } /*! \brief Set context with an id be the current context \param [in] id identity of context to be set to current */ void CContext::setCurrent(const string& id) { CObjectFactory::SetCurrentContextId(id); CGroupFactory::SetCurrentContextId(id); } /*! \brief Create a context with specific id \param [in] id identity of new context \return pointer to the new context or already-existed one with identity id */ CContext* CContext::create(const StdString& id) { CContext::setCurrent(id); bool hasctxt = CContext::has(id); CContext* context = CObjectFactory::CreateObject(id).get(); getRoot(); if (!hasctxt) CGroupFactory::AddChild(*root_ptr, context->getShared()); #define DECLARE_NODE(Name_, name_) \ C##Name_##Definition::create(C##Name_##Definition::GetDefName()); #define DECLARE_NODE_PAR(Name_, name_) #include "node_type.conf" return (context); } //! Server side: Receive a message to do some post processing void CContext::recvRegistry(CEventServer& event) { CBufferIn* buffer=event.subEvents.begin()->buffer; string id; *buffer>>id; get(id)->recvRegistry(*buffer); } void CContext::recvRegistry(CBufferIn& buffer) { if (server->intraCommRank==0) { CRegistry registry(server->intraComm) ; registry.fromBuffer(buffer) ; registryOut->mergeRegistry(registry) ; } } void CContext::sendRegistry(void) { registryOut->hierarchicalGatherRegistry() ; // Use correct context client to send message int nbSrvPools = (this->hasServer) ? (this->hasClient ? this->clientPrimServer.size() : 0) : 1; for (int i = 0; i < nbSrvPools; ++i) { CContextClient* contextClientTmp = (hasServer) ? clientPrimServer[i] : client; CEventClient event(CContext::GetType(), CContext::EVENT_ID_SEND_REGISTRY); if (contextClientTmp->isServerLeader()) { CMessage msg ; if (hasServer) msg<getIdServer(i); else msg<getIdServer(); if (contextClientTmp->clientRank==0) msg<<*registryOut ; const std::list& ranks = contextClientTmp->getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank,1,msg); contextClientTmp->sendEvent(event); } else contextClientTmp->sendEvent(event); } } /*! * \fn bool CContext::isFinalized(void) * Context is finalized if it received context post finalize event. */ bool CContext::isFinalized(void) { return finalized; } } // namespace xios