source: XIOS3/trunk/src/transport/p2p_client_buffer.cpp @ 2607

Last change on this file since 2607 was 2607, checked in by jderouillat, 4 months ago

In the P2P protocol, forces to allocate the fixed buffer if it is defined, accept more buffers in the growth phase. Add a missing timer in the interface

  • Property svn:executable set to *
File size: 11.4 KB
Line 
1#include "p2p_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9
10  CP2pClientBuffer::CP2pClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank)
11  {
12   
13    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
14    //control_[CONTROL_ADDR] = 0 ;
15    //control_[CONTROL_FINALIZE] = 0 ;
16    sendNewBuffer() ;
17    createWindow(commSelf, interCommMerged, intraServerRank ) ;
18    char dummy ;
19    MPI_Irecv(&dummy, 0, MPI_CHAR, intraServerRank, 22, interCommMerged, &finalizeRequest_) ;
20  }
21
22  void CP2pClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
23  {
24    CTimer::get("create Windows").resume() ;
25    //MPI_Comm interComm ;
26    //xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
27    //xios::MPI_Intercomm_merge(interComm, false, &winComm_) ;
28    //int rank ;
29    //MPI_Comm_rank(winComm_,&rank) ;
30    //info(logProtocol)<<"Windows rank="<<rank<<endl ;
31    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
32    //xios::MPI_Comm_free(&interComm) ;
33   
34    maxWindows_=MAX_WINDOWS ;   
35    windows_.resize(maxWindows_) ;
36    usedWindows_.resize(maxWindows_,false) ;
37    //for(int i=0;i<maxWindows_;++i)
38    //{
39    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
40    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
41    //}
42    currentWindow_=-1 ;
43    currentMirror_=-1 ;
44   
45    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
46    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
47
48    //MPI_Barrier(winComm_) ;
49    //MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
50    //MPI_Barrier(winComm_) ;
51    CTimer::get("create Windows").suspend() ;
52 
53 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
54 //   MPI_Win_unlock(0,winControl_) ;
55
56  } 
57
58
59
60  void CP2pClientBuffer::newBuffer(size_t size, bool fixed)
61  { 
62    currentMirror_++;
63    currentWindow_=(currentWindow_+1)%maxWindows_ ;
64    //if (usedWindows_[currentWindow_])
65    //{
66    //  ERROR("void CP2pClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
67    //}
68    //else usedWindows_[currentWindow_]=true ;
69    buffers_.push_back(new CBuffer(windows_[currentMirror_], size, fixed)); 
70    currentBuffer_=buffers_.back() ;
71    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<", size of the last buffer = " << size << endl ;
72  }
73
74  bool CP2pClientBuffer::isBufferFree(size_t size)
75  {
76    if (sentBlocRequest_.size()> maxSentBlocRequests_) return false ;
77    if (buffers_.size()>maxWindows_-1) return false ; 
78    CBuffer* buffer ;
79    if (buffers_.size()==0) return true ;
80    else if (!fixed_) return true ;
81    else 
82    {
83      buffer = buffers_.back() ;
84      if (buffer->remain()>=size) return true ;
85      else
86      {
87        if (buffer->isFixed()) 
88        {
89          if ( size > buffer->getSize()) return true ;
90          else if ( currentBufferSize_ < fixedSize_ ) return true ;
91          else return false ;
92        }
93        else return true ;
94      }
95    }
96  }
97 
98  int CP2pClientBuffer::writeBuffer(char* buffer, size_t size)
99  {
100    MPI_Aint addr ;
101    size_t start ;
102    size_t count ;
103    int nbBlocs=0 ;
104
105    if (isBufferFree(size))
106    {
107      while (size > 0)
108      {
109        if (buffers_.empty())
110        {
111          if (fixed_) {
112            currentBufferSize_=fixedSize_;
113            newBuffer(fixedSize_,fixed_) ;
114          }
115          else
116          { 
117            if (currentBufferSize_==0) currentBufferSize_=size ;
118            newBuffer(currentBufferSize_, fixed_) ;
119          }
120        }
121        else if ((currentBufferSize_ < fixedSize_)&&(fixed_))
122        {
123          // Forces to allocate the fixed buffer if defined
124          //   without this test, could be not done if each field size is < currentBufferSize_
125          currentBufferSize_ = fixedSize_ ;
126          newBuffer(currentBufferSize_, fixed_) ;
127        }
128        CBuffer* currentBuffer = buffers_.back() ;
129
130        //MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
131        currentBuffer->write(&buffer, size, addr, start, count) ;
132        if (count > 0) 
133        {
134          //info(logProtocol) << "Using currentMirror_ 1 : "<<currentMirror_ << endl;
135          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ;
136          nbBlocs++ ; 
137        }
138
139        currentBuffer->write(&buffer, size, addr, start, count) ;
140        //MPI_Win_unlock(0,windows_[currentWindow_]) ;
141
142        if (count > 0) 
143        {
144          //info(logProtocol) << "Using currentMirror_ 2 : "<<currentMirror_ << endl;
145          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentMirror_}) ;
146          nbBlocs++ ; 
147        }
148
149        if (size>0) 
150        {
151          if (fixed_) 
152          {
153            currentBufferSize_ = fixedSize_ ;
154            newBuffer(currentBufferSize_,fixed_) ;
155          }
156          else
157          {
158            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
159            newBuffer(currentBufferSize_,fixed_) ;
160          }
161        } 
162      }     
163      // send message here ?
164      return nbBlocs ;
165    }
166    else return 0 ;
167  }
168
169  void CP2pClientBuffer::freeBuffer(MPI_Aint addr)
170  {
171    if (addr != 0)
172    {
173      while(freeBloc(addr)) ;
174    }
175   
176    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
177    {
178      delete buffers_.front() ;
179      buffers_.pop_front() ;
180    }
181  }
182 
183  bool CP2pClientBuffer::freeBloc(MPI_Aint addr)
184  {
185    SBloc& bloc = blocs_.front() ;
186   
187    if (info.isActive(logProtocol))
188    {
189      size_t checksum=0 ;
190      for(size_t j=0;j<bloc.count;j++) checksum+=((unsigned char*)(bloc.addr))[j] ;
191      info(logProtocol)<<"free bloc sent to server rank "<<serverRank_<<" : addr="<<bloc.addr<<"  start="<<bloc.start<<"  count="<<bloc.count<<"  checksum="<<checksum<<endl ;
192    }
193
194    bloc.buffer->free(bloc.start, bloc.count) ;
195    if (bloc.buffer->getCount()==0) 
196      if (buffers_.size()>1) 
197      { 
198        //usedWindows_[bloc.window]=false ;
199        delete buffers_.front() ;
200        buffers_.pop_front() ;
201      }
202   
203    if (addr != bloc.addr) 
204    {
205      blocs_.pop_front() ;
206      return true ;
207    }
208    else 
209    {
210      blocs_.pop_front() ;
211      return false ;
212    }
213
214  }
215
216  bool CP2pClientBuffer::writeEvent(size_t timeline, CEventClient& event)
217  {
218    size_t size = event.getSize() ;
219    if (isBufferFree(size))
220    {
221      CBufferOut buffer(size) ;
222      event.send(timeline, size, &buffer) ;
223      size_t bufferSizeBefore = currentBufferSize_ ; 
224      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
225      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
226      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
227      return true ;
228    }
229    else return false ;
230  }
231
232  void CP2pClientBuffer::eventLoop(void)
233  {
234    // check to free requests
235    int flag ;
236    bool out = true;
237    SRequest request ; 
238    while (!requests_.empty() && out) 
239    {
240      request = requests_.front() ;
241      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ;
242      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
243      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ;
244      if (flag==true)
245      {
246        delete request.buffer ;
247        requests_.pop_front() ;
248      }
249      else out=false; 
250    }
251   
252    // check to free blocs
253//    MPI_Aint addr ;
254//    MPI_Aint finalize ;
255//    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
256//    addr = control_[CONTROL_ADDR] ;
257//    control_[CONTROL_ADDR] = 0 ;
258//    finalize = control_[CONTROL_FINALIZE] ;
259//    MPI_Win_unlock(0, winControl_) ;
260    MPI_Aint addr=0 ;
261    auto it=sentBlocRequest_.begin() ;
262    for( ; it!=sentBlocRequest_.end() ; ++it)
263    {
264      int flag=true ;
265      MPI_Status status ; 
266      MPI_Test(&it->mpiRequest, &flag, &status) ;
267      if (flag==false) 
268      {
269//        ++it ;
270        break ;
271      }
272      else addr = it->addr;
273    }
274    if (addr!=0) sentBlocRequest_.erase(sentBlocRequest_.begin(), it) ;
275    freeBuffer(addr) ;
276
277    // if (finalize==1) isFinalized_=true ;
278    listenFinalize() ;
279  }
280
281  void CP2pClientBuffer::listenFinalize(void)
282  {
283    if (!isFinalized_)
284    {
285      int flag ;
286      MPI_Status status ;
287      MPI_Test(&finalizeRequest_,&flag, &status) ;
288      if (flag) isFinalized_=true;
289    }
290  }
291
292  void CP2pClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
293  {
294    ostringstream outStr ;
295    SRequest request ;
296    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int)+sizeof(size_t))*nbBlocs) ; 
297    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
298    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
299    auto it = blocs_.end() ;
300    for(int i=0 ; i<nbBlocs; ++i,--it) ;
301    for(int i=0 ; i<nbBlocs; ++i,++it) 
302    {
303      *(request.buffer) << it->addr << it->count << it->window << it->start;
304   
305      if (info.isActive(logProtocol))
306      {
307        size_t checksum=0 ;
308        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
309        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  window="<<it->window<<"  start="<<it->start<<"  ;  " ;
310      }
311
312      sentBlocRequest_.emplace_back() ;
313      sentBlocRequest_.back().addr = it->addr ; 
314      MPI_Issend((void*)(it->addr), it->count, MPI_CHAR, intraServerRank_, 21, interCommMerged_, &sentBlocRequest_.back().mpiRequest) ;
315    }
316    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ;
317    //info(logProtocol) << "Send event : " << request.buffer->count() << endl;
318    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
319    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ;
320    info(logProtocol)<<outStr.str()<<endl ;
321    requests_.push_back(request) ;
322  }
323
324  void CP2pClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
325  {
326    SRequest request ;
327    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
328    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
329    //info(logProtocol) << "Send resize : " << request.buffer->count() << endl;
330    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
331    requests_.push_back(request) ;
332  }
333
334  void CP2pClientBuffer::sendNewBuffer(void)
335  {
336    MPI_Aint controlAddr ;
337//    MPI_Get_address(control_, &controlAddr) ;
338    MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ;
339  }
340
341}
Note: See TracBrowser for help on using the repository browser.