#include "xios_spl.hpp" #include "context_client.hpp" #include "context_server.hpp" #include "event_client.hpp" #include "buffer_out.hpp" #include "buffer_client.hpp" #include "type.hpp" #include "message.hpp" #include "event_client.hpp" #include "context.hpp" #include "mpi.hpp" #include "timer.hpp" #include "cxios.hpp" namespace xios { /*! \param [in] parent Pointer to context on client side \param [in] intraComm_ communicator of group client \param [in] interComm_ communicator of group server \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode) */ CContextClient::CContextClient(CContext* parent,MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) : mapBufferSize_(), parentServer(cxtSer) { context=parent ; intraComm=intraComm_ ; interComm=interComm_ ; MPI_Comm_rank(intraComm,&clientRank) ; MPI_Comm_size(intraComm,&clientSize) ; int flag ; MPI_Comm_test_inter(interComm,&flag) ; if (flag) MPI_Comm_remote_size(interComm,&serverSize); else MPI_Comm_size(interComm,&serverSize) ; timeLine=0 ; } /*! In case of attached mode, the current context must be reset to context for client \param [in] event Event sent to server */ void CContextClient::sendEvent(CEventClient& event) { list::iterator itServer ; list ranks ; list sizes ; list::iterator itSize ; ranks=event.getRanks() ; if (! event.isEmpty()) { sizes=event.getSizes() ; CMessage msg ; msg<<*(sizes.begin())<::iterator it=sizes.begin();it!=sizes.end();it++) *it+=msg.size() ; list buffList=getBuffers(ranks,sizes) ; list::iterator it ; for(it=buffList.begin(),itSize=sizes.begin();it!=buffList.end();++it,++itSize) { **it<<*itSize<hasServer) if (0 != parentServer) { waitEvent(ranks); CContext::setCurrent(context->getId()); } timeLine++ ; } /*! Special function to setup size of buffer not only on client side but also on server side corresponding to the connection */ void CContextClient::sendBufferSizeEvent() { std::map::iterator it, itE; std::map::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(); if (itMap == iteMap) ERROR("CBufferOut* CContextClient::sendBufferSizeEvent() ;", <<"No information about server buffer, that should not happen..."); for (; itMap != iteMap; ++itMap) { if (buffers.end() == buffers.find(itMap->first)) newBuffer(itMap->first); } CBufferOut* bufOut(NULL); itE = buffers.end(); for (it = buffers.begin(); it != itE; ++it) { bufOut = (it->second)->getBuffer(sizeof(StdSize)); bufOut->put(mapBufferSize_[it->first]); // Stupid C++ (it->second)->checkBuffer(); } } /*! If client is also server (attached mode), after sending event, it should process right away the incoming event. \param [in] ranks list rank of server connected this client */ void CContextClient::waitEvent(list& ranks) { // context->server->setPendingEvent() ; // while(checkBuffers(ranks)) // { // context->server->listen() ; // context->server->checkPendingRequest() ; // } // // while(context->server->hasPendingEvent()) // { // context->server->eventLoop() ; // } parentServer->server->setPendingEvent() ; while(checkBuffers(ranks)) { parentServer->server->listen() ; parentServer->server->checkPendingRequest() ; } while(parentServer->server->hasPendingEvent()) { parentServer->server->eventLoop() ; } } /*! Setup buffer for each connection to server and verify their state to put content into them \param [in] serverList list of rank of connected server \param [in] sizeList size of message corresponding to each connection \return List of buffer input which event can be placed */ list CContextClient::getBuffers(list& serverList, list& sizeList) { list::iterator itServer,itSize ; list bufferList ; map::iterator it ; list::iterator itBuffer ; list retBuffer ; bool free ; for(itServer=serverList.begin();itServer!=serverList.end();itServer++) { it=buffers.find(*itServer) ; if (it==buffers.end()) { newBuffer(*itServer) ; it=buffers.find(*itServer) ; } bufferList.push_back(it->second) ; } free=false ; CTimer::get("Blocking time").resume(); while(!free) { free=true ; for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++) { (*itBuffer)->checkBuffer() ; free&=(*itBuffer)->isBufferFree(*itSize) ; } } CTimer::get("Blocking time").suspend(); for(itBuffer=bufferList.begin(),itSize=sizeList.begin(); itBuffer!=bufferList.end();itBuffer++,itSize++) { retBuffer.push_back((*itBuffer)->getBuffer(*itSize)) ; } return retBuffer ; } /*! Make a new buffer for a certain connection to server with specific rank \param [in] rank rank of connected server */ void CContextClient::newBuffer(int rank) { buffers[rank]=new CClientBuffer(interComm,rank, mapBufferSize_[rank]) ; } /*! Verify state of buffers. Buffer is under pending state if there is no message on it \return state of buffers, pending(true), ready(false) */ bool CContextClient::checkBuffers(void) { map::iterator itBuff ; bool pending=false ; for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) pending|=itBuff->second->checkBuffer() ; return pending ; } //! Release all buffers void CContextClient::releaseBuffers(void) { map::iterator itBuff ; for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) delete itBuff->second ; } /*! Verify state of buffers corresponding to a connection \param [in] ranks list rank of server to which client connects to \return state of buffers, pending(true), ready(false) */ bool CContextClient::checkBuffers(list& ranks) { list::iterator it ; bool pending=false ; for(it=ranks.begin();it!=ranks.end();it++) pending|=buffers[*it]->checkBuffer() ; return pending ; } /*! Set buffer size for each connection \param [in] mapSize mapping rank of connected server to size of allocated buffer */ void CContextClient::setBufferSize(const std::map& mapSize) { mapBufferSize_ = mapSize; sendBufferSizeEvent(); } /*! Get leading server in the group of connected server \return rank of leading server */ int CContextClient::getServerLeader(void) { int clientByServer=clientSize/serverSize ; int remain=clientSize%serverSize ; if (clientRank<(clientByServer+1)*remain) { return clientRank/(clientByServer+1) ; } else { int rank=clientRank-(clientByServer+1)*remain ; int nbServer=serverSize-remain ; return remain+rank/clientByServer ; } } /*! Check if client connects to leading server \return connected(true), not connected (false) */ bool CContextClient::isServerLeader(void) { int clientByServer=clientSize/serverSize ; int remain=clientSize%serverSize ; if (clientRank<(clientByServer+1)*remain) { if (clientRank%(clientByServer+1)==0) return true ; else return false ; } else { int rank=clientRank-(clientByServer+1)*remain ; int nbServer=serverSize-remain ; if (rank%clientByServer==0) return true ; else return false ; } } /*! Finalize context client and do some reports */ void CContextClient::finalize(void) { map::iterator itBuff ; bool stop=true ; CEventClient event(CContext::GetType(),CContext::EVENT_ID_CONTEXT_FINALIZE) ; if (isServerLeader()) { CMessage msg ; event.push(getServerLeader(),1,msg) ; sendEvent(event) ; } else sendEvent(event) ; CTimer::get("Blocking time").resume(); while(stop) { checkBuffers() ; stop=false ; for(itBuff=buffers.begin();itBuff!=buffers.end();itBuff++) stop|=itBuff->second->hasPendingRequest() ; } CTimer::get("Blocking time").suspend(); std::map::const_iterator itbMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(), itMap; StdSize totalBuf = 0; for (itMap = itbMap; itMap != iteMap; ++itMap) { report(10)<< " Memory report : Context <"<getId()<<"> : client side : memory used for buffer of each connection to server" << endl << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; totalBuf += itMap->second; } report(0)<< " Memory report : Context <"<getId()<<"> : client side : total memory used for buffer "<