#include "globalScopeData.hpp" #include "xios_spl.hpp" #include "cxios.hpp" #include "server.hpp" #include "client.hpp" #include "type.hpp" #include "context.hpp" #include "object_template.hpp" #include "oasis_cinterface.hpp" #include #include #include "mpi.hpp" #include "tracer.hpp" #include "timer.hpp" #include "event_scheduler.hpp" namespace xios { MPI_Comm CServer::intraComm ; list CServer::interCommLeft ; list CServer::interCommRight ; // list CServer::interComm ; std::list CServer::contextInterComms; std::list CServer::contextIntraComms; int CServer::serverLevel = 0 ; int CServer::serverLeader_ = 0; int CServer::serverSize_ = 0; int CServer::nbPools = 0; int CServer::poolId = 0; int CServer::nbContexts_ = 0; bool CServer::isRoot = false ; int CServer::rank_ = INVALID_RANK; StdOFStream CServer::m_infoStream; StdOFStream CServer::m_errorStream; map CServer::contextList ; bool CServer::finished=false ; bool CServer::is_MPI_Initialized ; CEventScheduler* CServer::eventScheduler = 0; //--------------------------------------------------------------- /*! * \fn void CServer::initialize(void) * Creates intraComm for each possible type of servers (classical, primary or secondary). * In case of secondary servers intraComm is created for each secondary server pool. * (For now the assumption is that there is one proc per pool.) * Creates the following lists of interComms: * classical server -- interCommLeft * primary server -- interCommLeft and interCommRight * secondary server -- interComm for each pool. */ void CServer::initialize(void) { int initialized ; MPI_Initialized(&initialized) ; if (initialized) is_MPI_Initialized=true ; else is_MPI_Initialized=false ; // Not using OASIS if (!CXios::usingOasis) { if (!is_MPI_Initialized) { MPI_Init(NULL, NULL); } CTimer::get("XIOS").resume() ; boost::hash hashString ; unsigned long hashServer = hashString(CXios::xiosCodeId); unsigned long* hashAll ; // int rank ; int size ; int myColor ; int i,c ; MPI_Comm newComm, serversComm; MPI_Comm_size(CXios::globalComm, &size) ; MPI_Comm_rank(CXios::globalComm, &rank_); hashAll=new unsigned long[size] ; MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; map colors ; map leaders ; map lastProcesses ; // needed in case of two server levels map::iterator it ; int nbSrv = 0; for(i=0,c=0;ifirst!=hashServer) { clientLeader=it->second ; int intraCommSize, intraCommRank ; MPI_Comm_size(intraComm,&intraCommSize) ; MPI_Comm_rank(intraComm,&intraCommRank) ; info(50)<<"intercommCreate::server (classical mode) "<first != hashServer) { clientLeader=it->second ; int intraCommSize, intraCommRank ; MPI_Comm_size(intraComm, &intraCommSize) ; MPI_Comm_rank(intraComm, &intraCommRank) ; info(50)<<"intercommCreate::server (server level 1) "<second; } for (int i = 0; i < nbPools; ++i) { srvSndLeader = serverLeader_ + serverSize_ - nbPools + i; int intraCommSize, intraCommRank ; MPI_Comm_size(intraComm, &intraCommSize) ; MPI_Comm_rank(intraComm, &intraCommRank) ; info(50)<<"intercommCreate::client (server level 1) "<("oasis_codes_id") ; vector splitted ; boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ; vector::iterator it ; MPI_Comm newComm ; int globalRank ; MPI_Comm_rank(CXios::globalComm,&globalRank); for(it=splitted.begin();it!=splitted.end();it++) { oasis_get_intercomm(newComm,*it) ; // interComm.push_back(newComm) ; if ( !CXios::usingServer2) interCommLeft.push_back(newComm) ; else { if (serverLevel == 1) { info(50)<<"intercommCreate::server "<::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++) MPI_Comm_free(&(*it)); for (std::list::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) MPI_Comm_free(&(*it)); // for (std::list::iterator it = interComm.begin(); it != interComm.end(); it++) // MPI_Comm_free(&(*it)); // for (std::list::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) // MPI_Comm_free(&(*it)); for (std::list::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) MPI_Comm_free(&(*it)); MPI_Comm_free(&intraComm); if (!is_MPI_Initialized) { if (CXios::usingOasis) oasis_finalize(); else MPI_Finalize() ; } report(0)<<"Performance report : Time spent for XIOS : "<checkEvent() ; } CTimer::get("XIOS server").suspend() ; } void CServer::listenFinalize(void) { list::iterator it, itr; int msg ; int flag ; for(it=interCommLeft.begin();it!=interCommLeft.end();it++) { MPI_Status status ; traceOff() ; MPI_Iprobe(0,0,*it,&flag,&status) ; traceOn() ; if (flag==true) { MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; info(20)<<" CServer : Receive client finalize"< recvContextId; map::iterator it ; CBufferIn buffer(buff,count) ; string id ; int clientLeader ; int nbMessage ; buffer>>id>>nbMessage>>clientLeader ; it=recvContextId.find(id) ; if (it==recvContextId.end()) { contextMessage msg={0,0} ; pair::iterator,bool> ret ; ret=recvContextId.insert(pair(id,msg)) ; it=ret.first ; } it->second.nbRecv+=1 ; it->second.leaderRank+=clientLeader ; if (it->second.nbRecv==nbMessage) { int size ; MPI_Comm_size(intraComm,&size) ; MPI_Request* requests= new MPI_Request[size-1] ; MPI_Status* status= new MPI_Status[size-1] ; for(int i=1;isecond.leaderRank) ; recvContextId.erase(it) ; delete [] requests ; delete [] status ; } } void CServer::listenRootContext(void) { MPI_Status status ; int flag ; static void* buffer ; static MPI_Request request ; static bool recept=false ; int rank ; int count ; const int root=0 ; if (recept==false) { traceOff() ; MPI_Iprobe(root,2,intraComm, &flag, &status) ; traceOn() ; if (flag==true) { MPI_Get_count(&status,MPI_CHAR,&count) ; buffer=new char[count] ; MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ; recept=true ; } } else { MPI_Test(&request,&flag,&status) ; if (flag==true) { MPI_Get_count(&status,MPI_CHAR,&count) ; registerContext(buffer,count) ; delete [] buffer ; recept=false ; } } } void CServer::registerContext(void* buff, int count, int leaderRank) { string contextId; CBufferIn buffer(buff, count); buffer >> contextId; CContext* context; info(20) << "CServer : Register new Context : " << contextId << endl; if (contextList.find(contextId) != contextList.end()) ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)", << "Context '" << contextId << "' has already been registred"); context=CContext::create(contextId); contextList[contextId]=context; // Primary or classical server: initialize its own server (CContextServer) MPI_Comm inter; if (serverLevel < 2) { MPI_Comm contextInterComm; MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); MPI_Intercomm_merge(contextInterComm,1,&inter); MPI_Barrier(inter); MPI_Comm_free(&inter); context->initServer(intraComm,contextInterComm); contextInterComms.push_back(contextInterComm); } // Secondary server: initialize its own server (CContextServer) else if (serverLevel == 2) { MPI_Comm_dup(interCommLeft.front(), &inter); contextInterComms.push_back(inter); context->initServer(intraComm, contextInterComms.back()); } // Primary server: send create context message to secondary servers and initialize its own client (CContextClient) if (serverLevel == 1) { int i = 0, size; CMessage msg; int messageSize; MPI_Comm_size(intraComm, &size) ; for (std::list::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) { StdString str = contextId +"_server_" + boost::lexical_cast(i); msg<initClient(contextIntraComms.back(), contextInterComms.back()) ; delete [] buff ; } ++nbContexts_; } } void CServer::contextEventLoop(void) { bool isFinalized ; map::iterator it ; for(it=contextList.begin();it!=contextList.end();it++) { isFinalized=it->second->isFinalized(); if (isFinalized) { // it->second->postFinalize(); contextList.erase(it) ; break ; } else it->second->checkBuffersAndListen(); } } //! Get rank of the current process int CServer::getRank() { return rank_; } /*! * Open a file specified by a suffix and an extension and use it for the given file buffer. * The file name will be suffix+rank+extension. * * \param fileName[in] protype file name * \param ext [in] extension of the file * \param fb [in/out] the file buffer */ void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) { StdStringStream fileNameClient; int numDigit = 0; int size = 0; int id; MPI_Comm_size(CXios::globalComm, &size); while (size) { size /= 10; ++numDigit; } if (!CXios::usingServer2) id = getRank(); else { if (serverLevel == 1) id = rank_; else id = poolId; } fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; fb->open(fileNameClient.str().c_str(), std::ios::out); if (!fb->is_open()) ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s)."); } /*! * \brief Open a file stream to write the info logs * Open a file stream with a specific file name suffix+rank * to write the info logs. * \param fileName [in] protype file name */ void CServer::openInfoStream(const StdString& fileName) { std::filebuf* fb = m_infoStream.rdbuf(); openStream(fileName, ".out", fb); info.write2File(fb); report.write2File(fb); } //! Write the info logs to standard output void CServer::openInfoStream() { info.write2StdOut(); report.write2StdOut(); } //! Close the info logs file if it opens void CServer::closeInfoStream() { if (m_infoStream.is_open()) m_infoStream.close(); } /*! * \brief Open a file stream to write the error log * Open a file stream with a specific file name suffix+rank * to write the error log. * \param fileName [in] protype file name */ void CServer::openErrorStream(const StdString& fileName) { std::filebuf* fb = m_errorStream.rdbuf(); openStream(fileName, ".err", fb); error.write2File(fb); } //! Write the error log to standard error output void CServer::openErrorStream() { error.write2StdErr(); } //! Close the error log file if it opens void CServer::closeErrorStream() { if (m_errorStream.is_open()) m_errorStream.close(); } }