#include "xios_spl.hpp" #include "exception.hpp" #include "log.hpp" #include "buffer_out.hpp" #include "buffer_client.hpp" #include "cxios.hpp" #include "mpi.hpp" #include "tracer.hpp" namespace xios { size_t CClientBuffer::maxRequestSize = 0; CClientBuffer::CClientBuffer(MPI_Comm interComm, vector& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) : interComm(interComm) , clientRank_(clientRank) , serverRank(serverRank) , bufferSize(bufferSize) , estimatedMaxEventSize(estimatedMaxEventSize) , maxEventSize(0) , current(0) , count(0) , pending(false) , hasWindows(false) , windows_(windows) { if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; else hasWindows=true ; MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ; MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ; buffer[0] = bufferHeader[0]+headerSize ; buffer[1] = bufferHeader[1]+headerSize ; firstTimeLine[0]=(size_t*)bufferHeader[0] ; firstTimeLine[1]=(size_t*)bufferHeader[1] ; bufferCount[0]=(size_t*)bufferHeader[0] +1 ; bufferCount[1]=(size_t*)bufferHeader[1] +1 ; control[0]=(size_t*)bufferHeader[0] +2 ; control[1]=(size_t*)bufferHeader[1] +2 ; finalize[0]=(size_t*)bufferHeader[0] +3 ; finalize[1]=(size_t*)bufferHeader[1] +3 ; *firstTimeLine[0]=0 ; *firstTimeLine[1]=0 ; *bufferCount[0]=0 ; *bufferCount[1]=0 ; *control[0]=0 ; *control[1]=0 ; *finalize[0]=0 ; *finalize[1]=0 ; winState[0]=false ; winState[1]=false ; if (hasWindows) { MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ; MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ; MPI_Group group ; int groupSize,groupRank ; MPI_Win_get_group(windows_[0], &group) ; MPI_Group_size(group, &groupSize) ; MPI_Group_rank(group, &groupRank) ; if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "< bufferSize) ERROR("bool CClientBuffer::isBufferFree(StdSize size)", << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl); if (size > maxEventSize) { maxEventSize = size; if (size > estimatedMaxEventSize) error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl; if (size > maxRequestSize) maxRequestSize = size; } count=*bufferCount[current] ; return (size <= remain()); } CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size) { if (size <= remain()) { retBuffer->realloc(buffer[current] + count, size); count += size; if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ; *bufferCount[current]=count ; /* info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "< 0) { lockBuffer() ; // if (*control[current]==0 && bufferCount[current] > 0) if (*bufferCount[current] > 0) { MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request); pending = true; // *control[current]=0 ; *firstTimeLine[current]=0 ; *bufferCount[current]=0 ; unlockBuffer() ; if (current == 1) current = 0; else current = 1; count = 0; } else unlockBuffer() ; } } return pending; } bool CClientBuffer::hasPendingRequest(void) { lockBuffer() ; count=*bufferCount[current] ; unlockBuffer() ; return (pending || count > 0); } bool CClientBuffer::isNotifiedFinalized(void) { bool ret ; lockBuffer() ; ret=*finalize[current] == 1 ? true : false ; unlockBuffer() ; return ret; } }