#include "xmlioserver_spl.hpp" #include "cxios.hpp" #include "server.hpp" #include "type.hpp" #include "context.hpp" #include "object_template_impl.hpp" #include "tree_manager.hpp" #include "oasis_cinterface.hpp" #include #include #include namespace xios { MPI_Comm CServer::intraComm ; list CServer::interComm ; bool CServer::isRoot ; int CServer::rank ; map CServer::contextList ; bool CServer::finished=false ; bool CServer::is_MPI_Initialized ; void CServer::initialize(void) { int initialized ; MPI_Initialized(&initialized) ; if (initialized) is_MPI_Initialized=true ; else is_MPI_Initialized=false ; // Not using OASIS if (!CXios::usingOasis) { if (!is_MPI_Initialized) { int argc=0; char** argv=NULL; MPI_Init(&argc,&argv) ; } boost::hash hashString ; unsigned long hashServer=hashString(CXios::xiosCodeId) ; unsigned long* hashAll ; int rank ; int size ; int myColor ; int i,c ; MPI_Comm newComm ; MPI_Comm_size(CXios::globalComm,&size) ; MPI_Comm_rank(CXios::globalComm,&rank); hashAll=new unsigned long[size] ; MPI_Allgather(&hashServer,1,MPI_LONG,hashAll,1,MPI_LONG,CXios::globalComm) ; map colors ; map leaders ; map::iterator it ; for(i=0,c=0;ifirst!=hashServer) { clientLeader=it->second ; MPI_Intercomm_create(intraComm,0,CXios::globalComm,clientLeader,0,&newComm) ; interComm.push_back(newComm) ; } } delete [] hashAll ; } // using OASIS else { int rank ,size; if (!is_MPI_Initialized) oasis_init(CXios::xiosCodeId) ; oasis_get_localcomm(intraComm) ; MPI_Comm_rank(intraComm,&rank) ; MPI_Comm_size(intraComm,&size) ; string codesId=CXios::getin("oasis_codes_id") ; vector splitted ; boost::split( splitted, codesId, boost::is_space(), boost::token_compress_on ) ; vector::iterator it ; MPI_Comm newComm ; int globalRank ; MPI_Comm_rank(CXios::globalComm,&globalRank); for(it=splitted.begin();it!=splitted.end();it++) { oasis_get_intercomm(newComm,*it) ; if (rank==0) MPI_Send(&globalRank,1,MPI_INT,0,0,newComm) ; MPI_Comm_remote_size(newComm,&size); interComm.push_back(newComm) ; } } int rank ; MPI_Comm_rank(intraComm,&rank) ; if (rank==0) isRoot=true; else isRoot=false; eventLoop() ; finalize() ; } void CServer::finalize(void) { if (!is_MPI_Initialized) { if (CXios::usingOasis) oasis_finalize(); else MPI_Finalize() ; } } void CServer::eventLoop(void) { bool stop=false ; while(!stop) { if (isRoot) { listenContext(); if (!finished) listenFinalize() ; } else { listenRootContext(); if (!finished) listenRootFinalize() ; } contextEventLoop() ; if (finished && contextList.empty()) stop=true ; } } void CServer::listenFinalize(void) { list::iterator it; int msg ; int flag ; for(it=interComm.begin();it!=interComm.end();it++) { MPI_Status status ; MPI_Iprobe(0,0,*it,&flag,&status) ; if (flag==true) { MPI_Recv(&msg,1,MPI_INT,0,0,*it,&status) ; info(20)<<" CServer : Receive client finalize"< recvContextId ; map::iterator it ; CBufferIn buffer(buff,count) ; string id ; int clientLeader ; int nbMessage ; buffer>>id>>nbMessage>>clientLeader ; it=recvContextId.find(id) ; if (it==recvContextId.end()) { contextMessage msg={0,0} ; pair::iterator,bool> ret ; ret=recvContextId.insert(pair(id,msg)) ; it=ret.first ; } it->second.nbRecv+=1 ; it->second.leaderRank+=clientLeader ; if (it->second.nbRecv==nbMessage) { int size ; MPI_Comm_size(intraComm,&size) ; MPI_Request* requests= new MPI_Request[size-1] ; MPI_Status* status= new MPI_Status[size-1] ; for(int i=1;isecond.leaderRank) ; recvContextId.erase(it) ; delete [] requests ; delete [] status ; } } void CServer::listenRootContext(void) { MPI_Status status ; int flag ; static void* buffer ; static MPI_Request request ; static bool recept=false ; int rank ; int count ; const int root=0 ; if (recept==false) { MPI_Iprobe(root,2,intraComm, &flag, &status) ; if (flag==true) { MPI_Get_count(&status,MPI_CHAR,&count) ; buffer=new char[count] ; MPI_Irecv(buffer,count,MPI_CHAR,root,2,intraComm,&request) ; recept=true ; } } else { MPI_Test(&request,&flag,&status) ; if (flag==true) { MPI_Get_count(&status,MPI_CHAR,&count) ; registerContext(buffer,count) ; delete [] buffer ; recept=false ; } } } void CServer::registerContext(void* buff,int count, int leaderRank) { string contextId; CBufferIn buffer(buff,count) ; buffer>>contextId ; MPI_Comm contextIntercomm ; MPI_Intercomm_create(intraComm,0,CXios::globalComm,leaderRank,10+leaderRank,&contextIntercomm) ; info(20)<<"CServer : Register new Context : "< context=CTreeManager::CreateContext(contextId) ; contextList[contextId]=context.get() ; context->initServer(intraComm,contextIntercomm) ; } void CServer::contextEventLoop(void) { bool finished ; map::iterator it ; for(it=contextList.begin();it!=contextList.end();it++) { finished=it->second->eventLoop() ; if (finished) { contextList.erase(it) ; break ; } } } }