#include "xios_spl.hpp" #include "context_client.hpp" #include "context_server.hpp" #include "event_client.hpp" #include "buffer_out.hpp" #include "buffer_client.hpp" #include "type.hpp" #include "message.hpp" #include "event_client.hpp" #include "context.hpp" #include "mpi.hpp" #include "timer.hpp" #include "cxios.hpp" namespace xios { /*! \param [in] parent Pointer to context on client side \param [in] intraComm_ communicator of group client \param [in] interComm_ communicator of group server \cxtSer [in] cxtSer Pointer to context of server side. (It is only used on case of attached mode) */ CContextClient::CContextClient(CContext* parent, MPI_Comm intraComm_, MPI_Comm interComm_, CContext* cxtSer) : mapBufferSize_(), parentServer(cxtSer) { context = parent; intraComm = intraComm_; interComm = interComm_; MPI_Comm_rank(intraComm, &clientRank); MPI_Comm_size(intraComm, &clientSize); int flag; MPI_Comm_test_inter(interComm, &flag); if (flag) MPI_Comm_remote_size(interComm, &serverSize); else MPI_Comm_size(interComm, &serverSize); if (clientSize < serverSize) { int serverByClient = serverSize / clientSize; int remain = serverSize % clientSize; int rankStart = serverByClient * clientRank; if (clientRank < remain) { serverByClient++; rankStart += clientRank; } else rankStart += remain; for (int i = 0; i < serverByClient; i++) ranksServerLeader.push_back(rankStart + i); } else { int clientByServer = clientSize / serverSize; int remain = clientSize % serverSize; if (clientRank < (clientByServer + 1) * remain) { if (clientRank % (clientByServer + 1) == 0) ranksServerLeader.push_back(clientRank / (clientByServer + 1)); } else { int rank = clientRank - (clientByServer + 1) * remain; if (rank % clientByServer == 0) ranksServerLeader.push_back(remain + rank / clientByServer); } } timeLine = 0; } /*! In case of attached mode, the current context must be reset to context for client \param [in] event Event sent to server */ void CContextClient::sendEvent(CEventClient& event) { list::iterator itServer; list ranks; list sizes; list::iterator itSize; ranks = event.getRanks(); if (!event.isEmpty()) { sizes = event.getSizes(); CMessage msg; msg << *(sizes.begin()) << timeLine; for (list::iterator it = sizes.begin(); it != sizes.end(); it++) *it += msg.size(); list buffList = getBuffers(ranks, sizes); list::iterator it; for (it = buffList.begin(), itSize = sizes.begin(); it != buffList.end(); ++it, ++itSize) { **it << *itSize << timeLine; } event.send(buffList); checkBuffers(ranks); } if (0 != parentServer) // attached mode { waitEvent(ranks); CContext::setCurrent(context->getId()); } timeLine++; } /*! Special function to setup size of buffer not only on client side but also on server side corresponding to the connection */ void CContextClient::sendBufferSizeEvent() { std::map::iterator it, itE; std::map::const_iterator itMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(); if (itMap == iteMap) ERROR("void CContextClient::sendBufferSizeEvent()", <<"No information about server buffer, that should not happen..."); for (; itMap != iteMap; ++itMap) { if (buffers.end() == buffers.find(itMap->first)) newBuffer(itMap->first); } CBufferOut* bufOut(NULL); itE = buffers.end(); for (it = buffers.begin(); it != itE; ++it) { bufOut = (it->second)->getBuffer(sizeof(StdSize)); bufOut->put(mapBufferSize_[it->first]); // Stupid C++ (it->second)->checkBuffer(); } if (0 != parentServer) // attached mode { while (checkBuffers()) { parentServer->server->listen(); } CContext::setCurrent(context->getId()); } } /*! If client is also server (attached mode), after sending event, it should process right away the incoming event. \param [in] ranks list rank of server connected this client */ void CContextClient::waitEvent(list& ranks) { parentServer->server->setPendingEvent(); while (checkBuffers(ranks)) { parentServer->server->listen(); parentServer->server->checkPendingRequest(); } while (parentServer->server->hasPendingEvent()) { parentServer->server->eventLoop(); } } /*! Setup buffer for each connection to server and verify their state to put content into them \param [in] serverList list of rank of connected server \param [in] sizeList size of message corresponding to each connection \return List of buffer input which event can be placed */ list CContextClient::getBuffers(list& serverList, list& sizeList) { list::iterator itServer, itSize; list bufferList; map::iterator it; list::iterator itBuffer; list retBuffer; bool free; for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) { it = buffers.find(*itServer); if (it == buffers.end()) { newBuffer(*itServer); it = buffers.find(*itServer); } bufferList.push_back(it->second); } free = false; CTimer::get("Blocking time").resume(); while (!free) { free = true; for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) { (*itBuffer)->checkBuffer(); free &= (*itBuffer)->isBufferFree(*itSize); } if (!free) context->server->listen(); } CTimer::get("Blocking time").suspend(); for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) { retBuffer.push_back((*itBuffer)->getBuffer(*itSize)); } return retBuffer; } /*! Make a new buffer for a certain connection to server with specific rank \param [in] rank rank of connected server */ void CContextClient::newBuffer(int rank) { buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank]); } /*! Verify state of buffers. Buffer is under pending state if there is no message on it \return state of buffers, pending(true), ready(false) */ bool CContextClient::checkBuffers(void) { map::iterator itBuff; bool pending = false; for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) pending |= itBuff->second->checkBuffer(); return pending; } //! Release all buffers void CContextClient::releaseBuffers(void) { map::iterator itBuff; for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) delete itBuff->second; } /*! Verify state of buffers corresponding to a connection \param [in] ranks list rank of server to which client connects to \return state of buffers, pending(true), ready(false) */ bool CContextClient::checkBuffers(list& ranks) { list::iterator it; bool pending = false; for (it = ranks.begin(); it != ranks.end(); it++) pending |= buffers[*it]->checkBuffer(); return pending; } /*! Set buffer size for each connection \param [in] mapSize mapping rank of connected server to size of allocated buffer */ void CContextClient::setBufferSize(const std::map& mapSize) { mapBufferSize_ = mapSize; sendBufferSizeEvent(); } /*! Get leading server in the group of connected server \return ranks of leading servers */ const std::list& CContextClient::getRanksServerLeader(void) const { return ranksServerLeader; } /*! Check if client connects to leading server \return connected(true), not connected (false) */ bool CContextClient::isServerLeader(void) const { return !ranksServerLeader.empty(); } /*! Finalize context client and do some reports */ void CContextClient::finalize(void) { map::iterator itBuff; bool stop = true; CEventClient event(CContext::GetType(), CContext::EVENT_ID_CONTEXT_FINALIZE); if (isServerLeader()) { CMessage msg; const std::list& ranks = getRanksServerLeader(); for (std::list::const_iterator itRank = ranks.begin(), itRankEnd = ranks.end(); itRank != itRankEnd; ++itRank) event.push(*itRank, 1, msg); sendEvent(event); } else sendEvent(event); CTimer::get("Blocking time").resume(); while (stop) { checkBuffers(); stop = false; for (itBuff = buffers.begin(); itBuff != buffers.end(); itBuff++) stop |= itBuff->second->hasPendingRequest(); } CTimer::get("Blocking time").suspend(); std::map::const_iterator itbMap = mapBufferSize_.begin(), iteMap = mapBufferSize_.end(), itMap; StdSize totalBuf = 0; for (itMap = itbMap; itMap != iteMap; ++itMap) { report(10) << " Memory report : Context <" << context->getId() << "> : client side : memory used for buffer of each connection to server" << endl << " +) To server with rank " << itMap->first << " : " << itMap->second << " bytes " << endl; totalBuf += itMap->second; } report(0) << " Memory report : Context <" << context->getId() << "> : client side : total memory used for buffer " << totalBuf << " bytes" << endl; releaseBuffers(); } }