source: XIOS3/trunk/src/transport/p2p_client_buffer.cpp @ 2594

Last change on this file since 2594 was 2594, checked in by jderouillat, 8 months ago

Update the p2p protocol as a mirror protocol : the servers buffers will strictly mirror (number of buffers, positions of messages in the buffers) the clients buffers. The memory consumption of servers will be capped impplicitly by the clients behavior where the time spent to wait for free buffers could be present again.

  • Property svn:executable set to *
File size: 11.0 KB
Line 
1#include "p2p_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9
10  CP2pClientBuffer::CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank)
11  {
12   
13    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
14    //control_[CONTROL_ADDR] = 0 ;
15    //control_[CONTROL_FINALIZE] = 0 ;
16    sendNewBuffer() ;
17    createWindow(commSelf, interCommMerged, intraServerRank ) ;
18    char dummy ;
19    MPI_Irecv(&dummy, 0, MPI_CHAR, intraServerRank, 22, interCommMerged, &finalizeRequest_) ;
20  }
21
22  void CP2pClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
23  {
24    CTimer::get("create Windows").resume() ;
25    //MPI_Comm interComm ;
26    //xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
27    //xios::MPI_Intercomm_merge(interComm, false, &winComm_) ;
28    //int rank ;
29    //MPI_Comm_rank(winComm_,&rank) ;
30    //info(logProtocol)<<"Windows rank="<<rank<<endl ;
31    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
32    //xios::MPI_Comm_free(&interComm) ;
33   
34    maxWindows_=MAX_WINDOWS ;   
35    windows_.resize(maxWindows_) ;
36    usedWindows_.resize(maxWindows_,false) ;
37    //for(int i=0;i<maxWindows_;++i)
38    //{
39    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
40    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
41    //}
42    currentWindow_=-1 ;
43    currentMirror_=-1 ;
44   
45    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
46    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
47
48    //MPI_Barrier(winComm_) ;
49    //MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
50    //MPI_Barrier(winComm_) ;
51    CTimer::get("create Windows").suspend() ;
52 
53 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
54 //   MPI_Win_unlock(0,winControl_) ;
55
56  } 
57
58
59
60  void CP2pClientBuffer::newBuffer(size_t size, bool fixed)
61  { 
62    currentMirror_++;
63    currentWindow_=(currentWindow_+1)%maxWindows_ ;
64    //if (usedWindows_[currentWindow_])
65    //{
66    //  ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
67    //}
68    //else usedWindows_[currentWindow_]=true ;
69    buffers_.push_back(new CBuffer(windows_[currentMirror_], size, fixed)); 
70    currentBuffer_=buffers_.back() ;
71    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ;
72  }
73
74  bool CP2pClientBuffer::isBufferFree(size_t size)
75  {
76    if (sentBlocRequest_.size()> maxSentBlocRequests_) return false ;
77    if (buffers_.size()>maxWindows_-1) return false ; 
78    CBuffer* buffer ;
79    if (buffers_.size()==0) return true ;
80    else if (!fixed_) return true ;
81    else 
82    {
83      buffer = buffers_.back() ;
84      if (buffer->remain()>=size) return true ;
85      else
86      {
87        if (buffer->isFixed()) 
88        {
89          if ( size > buffer->getSize()) return true ;
90          else return false ;
91        }
92        else return true ;
93      }
94    }
95  }
96 
97  int CP2pClientBuffer::writeBuffer(char* buffer, size_t size)
98  {
99    MPI_Aint addr ;
100    size_t start ;
101    size_t count ;
102    int nbBlocs=0 ;
103
104    if (isBufferFree(size))
105    {
106      while (size > 0)
107      {
108        if (buffers_.empty())
109        {
110          if (fixed_) {
111            currentBufferSize_=fixedSize_;
112            newBuffer(fixedSize_,fixed_) ;
113          }
114          else
115          { 
116            if (currentBufferSize_==0) currentBufferSize_=size ;
117            newBuffer(currentBufferSize_, fixed_) ;
118          }
119        }
120        CBuffer* currentBuffer = buffers_.back() ;
121
122        //MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
123        currentBuffer->write(&buffer, size, addr, start, count) ;
124        if (count > 0) 
125        {
126          //info(logProtocol) << "Using currentMirror_ 1 : "<<currentMirror_ << endl;
127          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ;
128          nbBlocs++ ; 
129        }
130
131        currentBuffer->write(&buffer, size, addr, start, count) ;
132        //MPI_Win_unlock(0,windows_[currentWindow_]) ;
133
134        if (count > 0) 
135        {
136          //info(logProtocol) << "Using currentMirror_ 2 : "<<currentMirror_ << endl;
137          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ;
138          nbBlocs++ ; 
139        }
140
141        if (size>0) 
142        {
143          if (fixed_) 
144          {
145            currentBufferSize_ = fixedSize_ ;
146            newBuffer(currentBufferSize_,fixed_) ;
147          }
148          else
149          {
150            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
151            newBuffer(currentBufferSize_,fixed_) ;
152          }
153        } 
154      }     
155      // send message here ?
156      return nbBlocs ;
157    }
158    else return 0 ;
159  }
160
161  void CP2pClientBuffer::freeBuffer(MPI_Aint addr)
162  {
163    if (addr != 0)
164    {
165      while(freeBloc(addr)) ;
166    }
167   
168    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
169    {
170      delete buffers_.front() ;
171      buffers_.pop_front() ;
172    }
173  }
174 
175  bool CP2pClientBuffer::freeBloc(MPI_Aint addr)
176  {
177    SBloc& bloc = blocs_.front() ;
178   
179    if (info.isActive(logProtocol))
180    {
181      size_t checksum=0 ;
182      for(size_t j=0;j<bloc.count;j++) checksum+=((unsigned char*)(bloc.addr))[j] ;
183      info(logProtocol)<<"free bloc sent to server rank "<<serverRank_<<" : addr="<<bloc.addr<<"  start="<<bloc.start<<"  count="<<bloc.count<<"  checksum="<<checksum<<endl ;
184    }
185
186    bloc.buffer->free(bloc.start, bloc.count) ;
187    if (bloc.buffer->getCount()==0) 
188      if (buffers_.size()>1) 
189      { 
190        //usedWindows_[bloc.window]=false ;
191        delete buffers_.front() ;
192        buffers_.pop_front() ;
193      }
194   
195    if (addr != bloc.addr) 
196    {
197      blocs_.pop_front() ;
198      return true ;
199    }
200    else 
201    {
202      blocs_.pop_front() ;
203      return false ;
204    }
205
206  }
207
208  bool CP2pClientBuffer::writeEvent(size_t timeline, CEventClient& event)
209  {
210    size_t size = event.getSize() ;
211    if (isBufferFree(size))
212    {
213      CBufferOut buffer(size) ;
214      event.send(timeline, size, &buffer) ;
215      size_t bufferSizeBefore = currentBufferSize_ ; 
216      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
217      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
218      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
219      return true ;
220    }
221    else return false ;
222  }
223
224  void CP2pClientBuffer::eventLoop(void)
225  {
226    // check to free requests
227    int flag ;
228    bool out = true;
229    SRequest request ; 
230    while (!requests_.empty() && out) 
231    {
232      request = requests_.front() ;
233      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ;
234      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
235      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ;
236      if (flag==true)
237      {
238        delete request.buffer ;
239        requests_.pop_front() ;
240      }
241      else out=false; 
242    }
243   
244    // check to free blocs
245//    MPI_Aint addr ;
246//    MPI_Aint finalize ;
247//    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
248//    addr = control_[CONTROL_ADDR] ;
249//    control_[CONTROL_ADDR] = 0 ;
250//    finalize = control_[CONTROL_FINALIZE] ;
251//    MPI_Win_unlock(0, winControl_) ;
252    MPI_Aint addr=0 ;
253    auto it=sentBlocRequest_.begin() ;
254    for( ; it!=sentBlocRequest_.end() ; ++it)
255    {
256      int flag=true ;
257      MPI_Status status ; 
258      MPI_Test(&it->mpiRequest, &flag, &status) ;
259      if (flag==false) 
260      {
261//        ++it ;
262        break ;
263      }
264      else addr = it->addr;
265    }
266    if (addr!=0) sentBlocRequest_.erase(sentBlocRequest_.begin(), it) ;
267    freeBuffer(addr) ;
268
269    // if (finalize==1) isFinalized_=true ;
270    listenFinalize() ;
271  }
272
273  void CP2pClientBuffer::listenFinalize(void)
274  {
275    if (!isFinalized_)
276    {
277      int flag ;
278      MPI_Status status ;
279      MPI_Test(&finalizeRequest_,&flag, &status) ;
280      if (flag) isFinalized_=true;
281    }
282  }
283
284  void CP2pClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
285  {
286    ostringstream outStr ;
287    SRequest request ;
288    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int)+sizeof(size_t))*nbBlocs) ; 
289    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
290    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
291    auto it = blocs_.end() ;
292    for(int i=0 ; i<nbBlocs; ++i,--it) ;
293    for(int i=0 ; i<nbBlocs; ++i,++it) 
294    {
295      *(request.buffer) << it->addr << it->count << it->window << it->start;
296   
297      if (info.isActive(logProtocol))
298      {
299        size_t checksum=0 ;
300        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
301        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  window="<<it->window<<"  start="<<it->start<<"  ;  " ;
302      }
303
304      sentBlocRequest_.emplace_back() ;
305      sentBlocRequest_.back().addr = it->addr ; 
306      MPI_Issend((void*)(it->addr), it->count, MPI_CHAR, intraServerRank_, 21, interCommMerged_, &sentBlocRequest_.back().mpiRequest) ;
307    }
308    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ;
309    //info(logProtocol) << "Send event : " << request.buffer->count() << endl;
310    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
311    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ;
312    info(logProtocol)<<outStr.str()<<endl ;
313    requests_.push_back(request) ;
314  }
315
316  void CP2pClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
317  {
318    SRequest request ;
319    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
320    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
321    //info(logProtocol) << "Send resize : " << request.buffer->count() << endl;
322    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
323    requests_.push_back(request) ;
324  }
325
326  void CP2pClientBuffer::sendNewBuffer(void)
327  {
328    MPI_Aint controlAddr ;
329//    MPI_Get_address(control_, &controlAddr) ;
330    MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ;
331  }
332
333}
Note: See TracBrowser for help on using the repository browser.