#include "globalScopeData.hpp" #include "xios_spl.hpp" #include "cxios.hpp" #include "client.hpp" #include #include "type.hpp" #include "context.hpp" #include "context_client.hpp" #include "oasis_cinterface.hpp" #include "mpi.hpp" #include "timer.hpp" #include "buffer_client.hpp" #include "string_tools.hpp" #include "ressources_manager.hpp" #include "services_manager.hpp" #include #include namespace xios { const double serverPublishDefaultTimeout=10; MPI_Comm CClient::intraComm ; MPI_Comm CClient::interComm ; MPI_Comm CClient::clientsComm_ ; std::list CClient::contextInterComms; int CClient::serverLeader ; bool CClient::is_MPI_Initialized ; int CClient::rank_ = INVALID_RANK; StdOFStream CClient::m_infoStream; StdOFStream CClient::m_errorStream; CPoolRessource* CClient::poolRessource_=nullptr ; MPI_Comm& CClient::getInterComm(void) { return (interComm); } ///--------------------------------------------------------------- /*! * \fn void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) * Function creates intraComm (CClient::intraComm) for client group with id=codeId and interComm (CClient::interComm) between client and server groups. * \param [in] codeId identity of context. * \param [in/out] localComm local communicator. * \param [in/out] returnComm (intra)communicator of client group. */ void CClient::initialize(const string& codeId, MPI_Comm& localComm, MPI_Comm& returnComm) { MPI_Comm clientComm ; // initialize MPI if not initialized int initialized ; MPI_Initialized(&initialized) ; if (initialized) is_MPI_Initialized=true ; else is_MPI_Initialized=false ; MPI_Comm globalComm=CXios::getGlobalComm() ; ///////////////////////////////////////// ///////////// PART 1 //////////////////// ///////////////////////////////////////// // localComm isn't given if (localComm == MPI_COMM_NULL) { // don't use OASIS if (!CXios::usingOasis) { if (!is_MPI_Initialized) { MPI_Init(NULL, NULL); } CTimer::get("XIOS").resume() ; CTimer::get("XIOS init/finalize",false).resume() ; // split the global communicator // get hash from all model to attribute a unique color (int) and then split to get client communicator // every mpi process of globalComm (MPI_COMM_WORLD) must participate int commRank, commSize ; MPI_Comm_rank(globalComm,&commRank) ; MPI_Comm_size(globalComm,&commSize) ; std::hash hashString ; size_t hashClient=hashString(codeId) ; size_t* hashAll = new size_t[commSize] ; MPI_Allgather(&hashClient,1,MPI_UNSIGNED_LONG,hashAll,1,MPI_LONG,globalComm) ; int color=0 ; set listHash ; for(int i=0 ; i<=commRank ; i++) if (listHash.count(hashAll[i])==0) { listHash.insert(hashAll[i]) ; color=color+1 ; } delete[] hashAll ; MPI_Comm_split(globalComm, color, commRank, &clientComm) ; } else // using oasis to split communicator { if (!is_MPI_Initialized) oasis_init(codeId) ; oasis_get_localcomm(clientComm) ; } } else // localComm is given { MPI_Comm_dup(localComm,&clientComm) ; } ///////////////////////////////////////// ///////////// PART 2 //////////////////// ///////////////////////////////////////// // Create the XIOS communicator for every process which is related // to XIOS, as well on client side as on server side MPI_Comm xiosGlobalComm ; string strIds=CXios::getin("clients_code_id","") ; vector clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; if (strIds.empty()) { // no code Ids given, suppose XIOS initialisation is global int commRank, commGlobalRank, serverLeader, clientLeader,serverRemoteLeader,clientRemoteLeader ; MPI_Comm splitComm,interComm ; MPI_Comm_rank(globalComm,&commGlobalRank) ; MPI_Comm_split(globalComm, 0, commGlobalRank, &splitComm) ; int splitCommSize, globalCommSize ; MPI_Comm_size(splitComm,&splitCommSize) ; MPI_Comm_size(globalComm,&globalCommSize) ; if (splitCommSize==globalCommSize) // no server { MPI_Comm_dup(globalComm,&xiosGlobalComm) ; CXios::setXiosComm(xiosGlobalComm) ; } else { MPI_Comm_rank(splitComm,&commRank) ; if (commRank==0) clientLeader=commGlobalRank ; else clientLeader=0 ; serverLeader=0 ; MPI_Allreduce(&clientLeader,&clientRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; MPI_Allreduce(&serverLeader,&serverRemoteLeader,1,MPI_INT,MPI_SUM,globalComm) ; MPI_Intercomm_create(splitComm, 0, globalComm, serverRemoteLeader,1341,&interComm) ; MPI_Intercomm_merge(interComm,true,&xiosGlobalComm) ; CXios::setXiosComm(xiosGlobalComm) ; } } else { xiosGlobalCommByFileExchange(clientComm, codeId) ; } int commRank ; MPI_Comm_rank(CXios::getXiosComm(), &commRank) ; MPI_Comm_split(CXios::getXiosComm(),false,commRank, &clientsComm_) ; // is using server or not ? int xiosCommSize, clientsCommSize ; MPI_Comm_size(CXios::getXiosComm(), &xiosCommSize) ; MPI_Comm_size(clientsComm_, &clientsCommSize) ; if (xiosCommSize==clientsCommSize) CXios::setUsingServer() ; else CXios::setNotUsingServer() ; CXios::setGlobalRegistry(new CRegistry(clientsComm_)) ; ///////////////////////////////////////// ///////////// PART 3 //////////////////// ///////////////////////////////////////// CXios::launchDaemonsManager(false) ; poolRessource_ = new CPoolRessource(clientComm, codeId) ; ///////////////////////////////////////// ///////////// PART 4 //////////////////// ///////////////////////////////////////// returnComm = clientComm ; } void CClient::xiosGlobalCommByFileExchange(MPI_Comm clientComm, const string& codeId) { MPI_Comm globalComm=CXios::getGlobalComm() ; MPI_Comm xiosGlobalComm ; string strIds=CXios::getin("clients_code_id","") ; vector clientsCodeId=splitRegex(strIds,"\\s*,\\s*") ; int commRank, globalRank, clientRank, serverRank ; MPI_Comm_rank(clientComm, &commRank) ; MPI_Comm_rank(globalComm, &globalRank) ; string clientFileName("__xios_publisher::"+codeId+"__to_remove__") ; int error ; if (commRank==0) // if root process publish name { std::ofstream ofs (clientFileName, std::ofstream::out); ofs<("server_puplish_timeout",serverPublishDefaultTimeout) ; double time ; do { CTimer::get("server_publish_timeout").resume() ; ifs.clear() ; ifs.open(fileName, std::ifstream::in) ; CTimer::get("server_publish_timeout").suspend() ; } while (ifs.fail() && CTimer::get("server_publish_timeout").getCumulatedTime()=timeout || ifs.fail()) { ifs.clear() ; ifs.close() ; ifs.clear() ; error=true ; } else { ifs>>serverRank ; ifs.close() ; error=false ; } } MPI_Bcast(&error,1,MPI_INT,0,clientComm) ; if (error==false) // you have a server { MPI_Comm intraComm ; MPI_Comm_dup(clientComm,&intraComm) ; MPI_Comm interComm ; int pos=0 ; for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; bool high=true ; for(int i=pos ; i clientsRank(clientsCodeId.size()) ; if (commRank==0) { for(int i=0;i>clientsRank[i] ; ifs.close() ; } } int client ; MPI_Comm intraComm ; MPI_Comm_dup(clientComm,&intraComm) ; MPI_Comm interComm ; int pos=0 ; for(int i=0 ; codeId!=clientsCodeId[i]; i++) pos=pos+1 ; bool high=true ; for(int i=pos+1 ; icreateService(contextComm, id, 0, CServicesManager::CLIENT, 1) ; getPoolRessource()->createService(contextComm, CXios::defaultServerId, 0, CServicesManager::IO_SERVER, 1) ; if (commRank==0) while (!CXios::getServicesManager()->hasService(getPoolRessource()->getId(), id, 0)) { CXios::getDaemonsManager()->eventLoop();} if (commRank==0) CXios::getContextsManager()->createServerContext(getPoolRessource()->getId(), id, 0, id) ; int type=CServicesManager::CLIENT ; string name = CXios::getContextsManager()->getServerContextName(getPoolRessource()->getId(), id, 0, type, id) ; while (!CXios::getContextsManager()->hasContext(name, contextComm) ) { CXios::getDaemonsManager()->eventLoop() ; } } /*! * \fn void CClient::callOasisEnddef(void) * \brief Send the order to the servers to call "oasis_enddef". It must be done by each compound of models before calling oasis_enddef on client side * Function is only called by client. */ void CClient::callOasisEnddef(void) { bool oasisEnddef=CXios::getin("call_oasis_enddef",true) ; if (!oasisEnddef) ERROR("void CClient::callOasisEnddef(void)", <<"Function xios_oasis_enddef called but variable is set to false."< must be set to true"<finalize() ; auto globalRegistry=CXios::getGlobalRegistry() ; globalRegistry->hierarchicalGatherRegistry() ; if (commRank==0) { info(80)<<"Write data base Registry"<toString()<toFile("xios_registry.bin") ; } delete globalRegistry ; CTimer::get("XIOS init/finalize",false).suspend() ; CTimer::get("XIOS").suspend() ; CXios::finalizeDaemonsManager() ; if (!is_MPI_Initialized) { if (CXios::usingOasis) oasis_finalize(); else MPI_Finalize() ; } info(20) << "Client side context is finalized"<open(fileNameClient.str().c_str(), std::ios::out); if (!fb->is_open()) ERROR("void CClient::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", << std::endl << "Can not open <" << fileNameClient.str() << "> file to write the client log(s)."); } /*! * \brief Open a file stream to write the info logs * Open a file stream with a specific file name suffix+rank * to write the info logs. * \param fileName [in] protype file name */ void CClient::openInfoStream(const StdString& fileName) { std::filebuf* fb = m_infoStream.rdbuf(); openStream(fileName, ".out", fb); info.write2File(fb); report.write2File(fb); } //! Write the info logs to standard output void CClient::openInfoStream() { info.write2StdOut(); report.write2StdOut(); } //! Close the info logs file if it opens void CClient::closeInfoStream() { if (m_infoStream.is_open()) m_infoStream.close(); } /*! * \brief Open a file stream to write the error log * Open a file stream with a specific file name suffix+rank * to write the error log. * \param fileName [in] protype file name */ void CClient::openErrorStream(const StdString& fileName) { std::filebuf* fb = m_errorStream.rdbuf(); openStream(fileName, ".err", fb); error.write2File(fb); } //! Write the error log to standard error output void CClient::openErrorStream() { error.write2StdErr(); } //! Close the error log file if it opens void CClient::closeErrorStream() { if (m_errorStream.is_open()) m_errorStream.close(); } }