#include "globalScopeData.hpp" #include "xios_spl.hpp" #include "cxios.hpp" #include "server.hpp" #include "client.hpp" #include "type.hpp" #include "context.hpp" #include "object_template.hpp" #include "oasis_cinterface.hpp" #include #include #include "mpi.hpp" #include "tracer.hpp" #include "timer.hpp" #include "event_scheduler.hpp" using namespace ep_lib; namespace xios { MPI_Comm CServer::intraComm ; std::list CServer::interCommLeft ; std::list CServer::interCommRight ; std::list CServer::contextInterComms; std::list CServer::contextIntraComms; int CServer::serverLevel = 0 ; int CServer::nbContexts = 0; bool CServer::isRoot = false ; int CServer::rank_ = INVALID_RANK; StdOFStream CServer::m_infoStream; StdOFStream CServer::m_errorStream; map CServer::contextList ; vector CServer::sndServerGlobalRanks; bool CServer::finished=false ; bool CServer::is_MPI_Initialized ; CEventScheduler* CServer::eventScheduler = 0; //--------------------------------------------------------------- /*! * \fn void CServer::initialize(void) * Creates intraComm for each possible type of servers (classical, primary or secondary). * (For now the assumption is that there is one proc per secondary server pool.) * Creates interComm and stores them into the following lists: * classical server -- interCommLeft * primary server -- interCommLeft and interCommRight * secondary server -- interCommLeft for each pool. * IMPORTANT: CXios::usingServer2 should NOT be used beyond this function. Use CServer::serverLevel instead. */ void CServer::initialize(void) { //int initialized ; //MPI_Initialized(&initialized) ; //if (initialized) is_MPI_Initialized=true ; //else is_MPI_Initialized=false ; int rank ; // Not using OASIS if (!CXios::usingOasis) { //if (!is_MPI_Initialized) //{ // MPI_Init(NULL, NULL); //} CTimer::get("XIOS").resume() ; boost::hash hashString ; unsigned long hashServer = hashString(CXios::xiosCodeId); unsigned long* hashAll ; unsigned long* srvLevelAll ; int size ; int myColor ; int i,c ; MPI_Comm newComm; MPI_Comm_size(CXios::globalComm, &size) ; MPI_Comm_rank(CXios::globalComm, &rank_); hashAll=new unsigned long[size] ; MPI_Allgather(&hashServer, 1, MPI_LONG, hashAll, 1, MPI_LONG, CXios::globalComm) ; map colors ; map leaders ; map::iterator it ; // (1) Establish client leaders, distribute processes between two server levels std::vector srvRanks; for(i=0,c=0;i reqNbProc || nbPools < 1) { error(0)<<"WARNING: void CServer::initialize(void)"<= firstSndSrvRank) { if (rank_ == srvRanks[i]) { serverLevel=2; } poolLeader += procsPerPool; if (remainder != 0) { ++poolLeader; --remainder; } //*********** (2) Comment out the two lines below to set one process per pool // if (poolLeader < srvRanks.size()) // sndServerGlobalRanks.push_back(srvRanks[poolLeader]); //*********** (3) Uncomment the line below to set one process per pool sndServerGlobalRanks.push_back(srvRanks[i]); } else { if (rank_ == srvRanks[i]) serverLevel=1; } } if (serverLevel==2) { #pragma omp critical (_output) info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <= sndServerGlobalRanks[i]) { if ( i == sndServerGlobalRanks.size()-1) { myColor = colors.size() + sndServerGlobalRanks[i]; } else if (rank_< sndServerGlobalRanks[i+1]) { myColor = colors.size() + sndServerGlobalRanks[i]; break; } } } } } } // (2) Create intraComm if (serverLevel != 2) myColor=colors[hashServer]; MPI_Comm_split(CXios::globalComm, myColor, rank_, &intraComm) ; // (3) Create interComm if (serverLevel == 0) { int clientLeader; for(it=leaders.begin();it!=leaders.end();it++) { if (it->first!=hashServer) { clientLeader=it->second ; int intraCommSize, intraCommRank ; MPI_Comm_size(intraComm,&intraCommSize) ; MPI_Comm_rank(intraComm,&intraCommRank) ; #pragma omp critical (_output) { info(50)<<"intercommCreate::server (classical mode) "<first != hashServer) { clientLeader=it->second ; int intraCommSize, intraCommRank ; MPI_Comm_size(intraComm, &intraCommSize) ; MPI_Comm_rank(intraComm, &intraCommRank) ; #pragma omp critical (_output) { info(50)<<"intercommCreate::server (server level 1) "< reqNbProc || nbPools < 1) { error(0)<<"WARNING: void CServer::initialize(void)"<= firstSndSrvRank) { if (globalRank == srvGlobalRanks[i]) { serverLevel=2; } poolLeader += procsPerPool; if (remainder != 0) { ++poolLeader; --remainder; } //*********** (2) Comment out the two lines below to set one process per pool // if (poolLeader < size) // sndServerGlobalRanks.push_back(srvGlobalRanks[poolLeader]); //*********** (3) Uncomment the line below to set one process per pool sndServerGlobalRanks.push_back(srvGlobalRanks[i]); } else { if (globalRank == srvGlobalRanks[i]) serverLevel=1; } } if (serverLevel==2) { info(50)<<"The number of secondary server pools is "<< sndServerGlobalRanks.size() <= sndServerGlobalRanks[i]) { if (i == sndServerGlobalRanks.size()-1) { myColor = sndServerGlobalRanks[i]; } else if (globalRank< sndServerGlobalRanks[i+1]) { myColor = sndServerGlobalRanks[i]; break; } } } } if (serverLevel != 2) myColor=0; MPI_Comm_split(localComm, myColor, rank_, &intraComm) ; } } string codesId=CXios::getin("oasis_codes_id") ; vector splitted ; boost::split( splitted, codesId, boost::is_any_of(","), boost::token_compress_on ) ; vector::iterator it ; MPI_Comm newComm ; int globalRank ; MPI_Comm_rank(CXios::globalComm,&globalRank); // (2) Create interComms with models for(it=splitted.begin();it!=splitted.end();it++) { oasis_get_intercomm(newComm,*it) ; if ( serverLevel == 0 || serverLevel == 1) { interCommLeft.push_back(newComm) ; if (rank_==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; } } // (3) Create interComms between primary and secondary servers int intraCommSize, intraCommRank ; MPI_Comm_size(intraComm,&intraCommSize) ; MPI_Comm_rank(intraComm, &intraCommRank) ; if (serverLevel == 1) { for (int i = 0; i < sndServerGlobalRanks.size(); ++i) { int srvSndLeader = sndServerGlobalRanks[i]; info(50)<<"intercommCreate::client (server level 1) "<::iterator it = contextInterComms.begin(); it != contextInterComms.end(); it++) MPI_Comm_free(&(*it)); for (std::list::iterator it = contextIntraComms.begin(); it != contextIntraComms.end(); it++) MPI_Comm_free(&(*it)); // for (std::list::iterator it = interComm.begin(); it != interComm.end(); it++) // MPI_Comm_free(&(*it)); // for (std::list::iterator it = interCommLeft.begin(); it != interCommLeft.end(); it++) // MPI_Comm_free(&(*it)); for (std::list::iterator it = interCommRight.begin(); it != interCommRight.end(); it++) MPI_Comm_free(&(*it)); MPI_Comm_free(&intraComm); if (!is_MPI_Initialized) { if (CXios::usingOasis) oasis_finalize(); //else MPI_Finalize() ; } report(0)<<"Performance report : Time spent for XIOS : "<checkEvent() ; } CTimer::get("XIOS server").suspend() ; } void CServer::listenFinalize(void) { list::iterator it, itr; int msg ; int flag ; for(it=interCommLeft.begin();it!=interCommLeft.end();it++) { MPI_Status status ; traceOff() ; MPI_Iprobe(0,0,*it,&flag,&status) ; traceOn() ; if (flag==true) { MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; info(20)<<" CServer : Receive client finalize"< recvContextId; map::iterator it ; CBufferIn buffer(buff,count) ; string id ; int clientLeader ; int nbMessage ; buffer>>id>>nbMessage>>clientLeader ; it=recvContextId.find(id) ; if (it==recvContextId.end()) { contextMessage msg={0,0} ; pair::iterator,bool> ret ; ret=recvContextId.insert(pair(id,msg)) ; it=ret.first ; } it->second.nbRecv+=1 ; it->second.leaderRank+=clientLeader ; if (it->second.nbRecv==nbMessage) { int size ; MPI_Comm_size(intraComm,&size) ; // MPI_Request* requests= new MPI_Request[size-1] ; // MPI_Status* status= new MPI_Status[size-1] ; MPI_Request* requests= new MPI_Request[size] ; MPI_Status* status= new MPI_Status[size] ; CMessage msg ; msg<second.leaderRank; int messageSize=msg.size() ; void * sendBuff = new char[messageSize] ; CBufferOut sendBuffer(sendBuff,messageSize) ; sendBuffer< buffers; static std::vector requests ; static std::vector counts ; static std::vector isEventRegistered ; static std::vector isEventQueued ; MPI_Request request; int rank ; const int root=0 ; boost::hash hashString; size_t hashId = hashString("RegisterContext"); // (1) Receive context id from the root, save it into a buffer traceOff() ; MPI_Iprobe(root,2,intraComm, &flag, &status) ; traceOn() ; if (flag==true) { int my_count; counts.push_back(0); MPI_Get_count(&status,MPI_CHAR,&(counts.back())) ; buffers.push_back(new char[counts.back()]) ; requests.push_back(request); MPI_Irecv((void*)(buffers.back()),counts.back(),MPI_CHAR,root,2,intraComm,&(requests.back())) ; isEventRegistered.push_back(false); isEventQueued.push_back(false); nbContexts++; } for (int ctxNb = 0; ctxNb < nbContexts; ctxNb++ ) { // (2) If context id is received, register an event MPI_Test(&requests[ctxNb],&flag,&status) ; if (flag==true && !isEventRegistered[ctxNb]) { eventScheduler->registerEvent(ctxNb,hashId); isEventRegistered[ctxNb] = true; } // (3) If event has been scheduled, call register context if (eventScheduler->queryEvent(ctxNb,hashId) && !isEventQueued[ctxNb]) { registerContext(buffers[ctxNb],counts[ctxNb]) ; isEventQueued[ctxNb] = true; delete [] buffers[ctxNb] ; } } } void CServer::registerContext(void* buff, int count, int leaderRank) { string contextId; CBufferIn buffer(buff, count); // buffer >> contextId; buffer >> contextId>>leaderRank; CContext* context; info(20) << "CServer : Register new Context : " << contextId << endl; if (contextList.find(contextId) != contextList.end()) ERROR("void CServer::registerContext(void* buff, int count, int leaderRank)", << "Context '" << contextId << "' has already been registred"); context=CContext::create(contextId); contextList[contextId]=context; // Primary or classical server: create communication channel with a client // (1) create interComm (with a client) // (2) initialize client and server (contextClient and contextServer) MPI_Comm inter; if (serverLevel < 2) { MPI_Comm contextInterComm; MPI_Intercomm_create(intraComm, 0, CXios::globalComm, leaderRank, 10+leaderRank, &contextInterComm); MPI_Intercomm_merge(contextInterComm,1,&inter); MPI_Barrier(inter); context->initServer(intraComm,contextInterComm); contextInterComms.push_back(contextInterComm); MPI_Comm_free(&inter); } // Secondary server: create communication channel with a primary server // (1) duplicate interComm with a primary server // (2) initialize client and server (contextClient and contextServer) // Remark: in the case of the secondary server there is no need to create an interComm calling MPI_Intercomm_create, // because interComm of CContext is defined on the same processes as the interComm of CServer. // So just duplicate it. else if (serverLevel == 2) { MPI_Comm_dup(interCommLeft.front(), &inter); contextInterComms.push_back(inter); context->initServer(intraComm, contextInterComms.back()); } // Primary server: // (1) send create context message to secondary servers // (2) initialize communication channels with secondary servers (create contextClient and contextServer) if (serverLevel == 1) { int i = 0, size; MPI_Comm_size(intraComm, &size) ; for (std::list::iterator it = interCommRight.begin(); it != interCommRight.end(); it++, ++i) { StdString str = contextId +"_server_" + boost::lexical_cast(i); CMessage msg; int messageSize; msg<initClient(contextIntraComms.back(), contextInterComms.back()) ; delete [] buff ; } } } void CServer::contextEventLoop(bool enableEventsProcessing /*= true*/) { bool isFinalized ; map::iterator it ; for(it=contextList.begin();it!=contextList.end();it++) { isFinalized=it->second->isFinalized(); if (isFinalized) { contextList.erase(it) ; break ; } else it->second->checkBuffersAndListen(enableEventsProcessing); } } //! Get rank of the current process in the intraComm int CServer::getRank() { int rank; MPI_Comm_rank(intraComm,&rank); return rank; } vector& CServer::getSecondaryServerGlobalRanks() { return sndServerGlobalRanks; } /*! * Open a file specified by a suffix and an extension and use it for the given file buffer. * The file name will be suffix+rank+extension. * * \param fileName[in] protype file name * \param ext [in] extension of the file * \param fb [in/out] the file buffer */ void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb) { StdStringStream fileNameClient; int numDigit = 0; int size = 0; int id; MPI_Comm_size(CXios::globalComm, &size); while (size) { size /= 10; ++numDigit; } id = rank_; //getRank(); fileNameClient << fileName << "_" << std::setfill('0') << std::setw(numDigit) << id << ext; fb->open(fileNameClient.str().c_str(), std::ios::out); if (!fb->is_open()) ERROR("void CServer::openStream(const StdString& fileName, const StdString& ext, std::filebuf* fb)", << std::endl << "Can not open <" << fileNameClient << "> file to write the server log(s)."); } /*! * \brief Open a file stream to write the info logs * Open a file stream with a specific file name suffix+rank * to write the info logs. * \param fileName [in] protype file name */ void CServer::openInfoStream(const StdString& fileName) { std::filebuf* fb = m_infoStream.rdbuf(); openStream(fileName, ".out", fb); info.write2File(fb); report.write2File(fb); } //! Write the info logs to standard output void CServer::openInfoStream() { info.write2StdOut(); report.write2StdOut(); } //! Close the info logs file if it opens void CServer::closeInfoStream() { if (m_infoStream.is_open()) m_infoStream.close(); } /*! * \brief Open a file stream to write the error log * Open a file stream with a specific file name suffix+rank * to write the error log. * \param fileName [in] protype file name */ void CServer::openErrorStream(const StdString& fileName) { std::filebuf* fb = m_errorStream.rdbuf(); openStream(fileName, ".err", fb); error.write2File(fb); } //! Write the error log to standard error output void CServer::openErrorStream() { error.write2StdErr(); } //! Close the error log file if it opens void CServer::closeErrorStream() { if (m_errorStream.is_open()) m_errorStream.close(); } }