#include "p2p_client_buffer.hpp" #include "event_client.hpp" #include "timer.hpp" namespace xios { extern CLogType logProtocol; CP2pClientBuffer::CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank) { //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ; //control_[CONTROL_ADDR] = 0 ; //control_[CONTROL_FINALIZE] = 0 ; sendNewBuffer() ; createWindow(commSelf, interCommMerged, intraServerRank ) ; char dummy ; MPI_Irecv(&dummy, 0, MPI_CHAR, intraServerRank, 22, interCommMerged, &finalizeRequest_) ; } void CP2pClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank ) { CTimer::get("create Windows").resume() ; //MPI_Comm interComm ; //xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ; //xios::MPI_Intercomm_merge(interComm, false, &winComm_) ; //int rank ; //MPI_Comm_rank(winComm_,&rank) ; //info(logProtocol)<<"Windows rank="<(count), currentMirror_}) ; nbBlocs++ ; } currentBuffer->write(&buffer, size, addr, start, count) ; //MPI_Win_unlock(0,windows_[currentWindow_]) ; if (count > 0) { //info(logProtocol) << "Using currentMirror_ 2 : "<(count), currentMirror_}) ; nbBlocs++ ; } if (size>0) { if (fixed_) { currentBufferSize_ = fixedSize_ ; newBuffer(currentBufferSize_,fixed_) ; } else { currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ; newBuffer(currentBufferSize_,fixed_) ; } } } // send message here ? return nbBlocs ; } else return 0 ; } void CP2pClientBuffer::freeBuffer(MPI_Aint addr) { if (addr != 0) { while(freeBloc(addr)) ; } if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) { delete buffers_.front() ; buffers_.pop_front() ; } } bool CP2pClientBuffer::freeBloc(MPI_Aint addr) { SBloc& bloc = blocs_.front() ; if (info.isActive(logProtocol)) { size_t checksum=0 ; for(size_t j=0;jmpiRequest, &flag, &status) ; if (flag==false) { // ++it ; break ; } else addr = it->addr; } if (addr!=0) sentBlocRequest_.erase(sentBlocRequest_.begin(), it) ; freeBuffer(addr) ; // if (finalize==1) isFinalized_=true ; listenFinalize() ; } void CP2pClientBuffer::listenFinalize(void) { if (!isFinalized_) { int flag ; MPI_Status status ; MPI_Test(&finalizeRequest_,&flag, &status) ; if (flag) isFinalized_=true; } } void CP2pClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs) { ostringstream outStr ; SRequest request ; request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int)+sizeof(size_t))*nbBlocs) ; *(request.buffer)<addr ; MPI_Issend((void*)(it->addr), it->count, MPI_CHAR, intraServerRank_, 21, interCommMerged_, &sentBlocRequest_.back().mpiRequest) ; } if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; //info(logProtocol) << "Send event : " << request.buffer->count() << endl; MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; info(logProtocol)<count() << endl; MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ; requests_.push_back(request) ; } void CP2pClientBuffer::sendNewBuffer(void) { MPI_Aint controlAddr ; // MPI_Get_address(control_, &controlAddr) ; MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ; } }