source: XIOS3/trunk/src/transport/one_sided_client_buffer.cpp @ 2589

Last change on this file since 2589 was 2589, checked in by jderouillat, 8 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 8.8 KB
Line 
1#include "one_sided_client_buffer.hpp"
2#include "event_client.hpp"
3#include "timer.hpp"
4
5namespace xios
6{
7 
8  extern CLogType logProtocol;
9
10  COneSidedClientBuffer::COneSidedClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) : interComm_(interComm), serverRank_(serverRank), interCommMerged_(interCommMerged), intraServerRank_(intraServerRank)
11  {
12   
13    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
14    control_[CONTROL_ADDR] = 0 ;
15    control_[CONTROL_FINALIZE] = 0 ;
16    sendNewBuffer() ;
17    createWindow(commSelf, interCommMerged, intraServerRank ) ;
18  }
19
20  void COneSidedClientBuffer::createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank )
21  {
22    CTimer::get("create Windows").resume() ;
23    MPI_Comm interComm ;
24    xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, intraServerRank, 0, &interComm) ;
25    xios::MPI_Intercomm_merge(interComm, false, &winComm_) ;
26    int rank ;
27    MPI_Comm_rank(winComm_,&rank) ;
28    info(logProtocol)<<"Windows rank="<<rank<<endl ;
29    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    xios::MPI_Comm_free(&interComm) ;
31   
32    maxWindows_=MAX_WINDOWS ;   
33    windows_.resize(maxWindows_) ;
34    usedWindows_.resize(maxWindows_,false) ;
35    for(int i=0;i<maxWindows_;++i) 
36    {
37      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    }
40    currentWindow_=-1 ;
41   
42    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
43    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
44
45    MPI_Barrier(winComm_) ;
46    MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ;
47    MPI_Barrier(winComm_) ;
48    CTimer::get("create Windows").suspend() ;
49 
50 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ;
51 //   MPI_Win_unlock(0,winControl_) ;
52
53  } 
54
55
56
57  void COneSidedClientBuffer::newBuffer(size_t size, bool fixed)
58  { 
59    currentWindow_=(currentWindow_+1)%maxWindows_ ;
60    if (usedWindows_[currentWindow_]) 
61    {
62      ERROR("void COneSidedClientBuffer::newBuffer(size_t size, bool fixed)",<<"Try to alloc buffer to a window already in use"<<endl ) ;
63    }
64    else usedWindows_[currentWindow_]=true ;
65    buffers_.push_back(new CBuffer(windows_[currentWindow_], size, fixed)); 
66    currentBuffer_=buffers_.back() ;
67    info(logProtocol)<<"   Nb attached memory blocs="<<buffers_.size()<<endl ;
68  }
69
70  bool COneSidedClientBuffer::isBufferFree(size_t size)
71  {
72    if (buffers_.size()>maxWindows_-1) return false ; 
73    CBuffer* buffer ;
74    if (buffers_.size()==0) return true ;
75    else if (!fixed_) return true ;
76    else 
77    {
78      buffer = buffers_.back() ;
79      if (buffer->remain()>=size) return true ;
80      else
81      {
82        if (buffer->isFixed()) 
83        {
84          if ( size > buffer->getSize()) return true ;
85          else return false ;
86        }
87        else return true ;
88      }
89    }
90  }
91 
92  int COneSidedClientBuffer::writeBuffer(char* buffer, size_t size)
93  {
94    MPI_Aint addr ;
95    size_t start ;
96    size_t count ;
97    int nbBlocs=0 ;
98
99    if (isBufferFree(size))
100    {
101      while (size > 0)
102      {
103        if (buffers_.empty())
104        {
105          if (fixed_) newBuffer(fixedSize_,fixed_) ;
106          else
107          { 
108            if (currentBufferSize_==0) currentBufferSize_=size ;
109            newBuffer(currentBufferSize_, fixed_) ;
110          }
111        }
112        CBuffer* currentBuffer = buffers_.back() ;
113
114        MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, windows_[currentWindow_]) ;
115        currentBuffer->write(&buffer, size, addr, start, count) ;
116        if (count > 0) 
117        {
118          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
119          nbBlocs++ ; 
120        }
121
122        currentBuffer->write(&buffer, size, addr, start, count) ;
123        MPI_Win_unlock(0,windows_[currentWindow_]) ;
124
125        if (count > 0) 
126        {
127          blocs_.push_back({addr,currentBuffer_, start, static_cast<int>(count), currentWindow_}) ;
128          nbBlocs++ ; 
129        }
130
131        if (size>0) 
132        {
133          if (fixed_) 
134          {
135            currentBufferSize_ = fixedSize_ ;
136            newBuffer(currentBufferSize_,fixed_) ;
137          }
138          else
139          {
140            currentBufferSize_ = max((size_t)(currentBufferSize_*growingFactor_), size) ;
141            newBuffer(currentBufferSize_,fixed_) ;
142          }
143        } 
144      }     
145      // send message here ?
146      return nbBlocs ;
147    }
148    else return 0 ;
149  }
150
151  void COneSidedClientBuffer::freeBuffer(MPI_Aint addr)
152  {
153//    if (addr != lastFreedBloc_)
154    if (addr != 0)
155    {
156      while(freeBloc(addr)) ;
157//      lastFreedBloc_ = addr ;
158    }
159   
160    if (isFinalized_ && !buffers_.empty() && buffers_.front()->getCount()==0) 
161    {
162      delete buffers_.front() ;
163      buffers_.pop_front() ;
164    }
165  }
166 
167  bool COneSidedClientBuffer::freeBloc(MPI_Aint addr)
168  {
169    SBloc& bloc = blocs_.front() ;
170    bloc.buffer->free(bloc.start, bloc.count) ;
171    if (bloc.buffer->getCount()==0) 
172      if (buffers_.size()>1) 
173      { 
174        usedWindows_[bloc.window]=false ;
175        delete buffers_.front() ;
176        buffers_.pop_front() ;
177      }
178   
179    if (addr != bloc.addr) 
180    {
181      blocs_.pop_front() ;
182      return true ;
183    }
184    else 
185    {
186      blocs_.pop_front() ;
187      return false ;
188    }
189
190  }
191
192  bool COneSidedClientBuffer::writeEvent(size_t timeline, CEventClient& event)
193  {
194    size_t size = event.getSize() ;
195    if (isBufferFree(size))
196    {
197      CBufferOut buffer(size) ;
198      event.send(timeline, size, &buffer) ;
199      size_t bufferSizeBefore = currentBufferSize_ ; 
200      int nbBlocs = writeBuffer((char*)buffer.start(),buffer.count()) ;
201      if (currentBufferSize_!=bufferSizeBefore) sendResizeBufferEvent(timeline,currentBufferSize_) ;
202      sendTimelineEvent(timeline, event.getNbSender(), nbBlocs) ;
203      return true ;
204    }
205    else return false ;
206  }
207
208  void COneSidedClientBuffer::eventLoop(void)
209  {
210    // check to free requests
211    int flag ;
212    bool out = true;
213    SRequest request ; 
214    while (!requests_.empty() && out) 
215    {
216      request = requests_.front() ;
217      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ;
218      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ;
219      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ;
220      if (flag==true)
221      {
222        delete request.buffer ;
223        requests_.pop_front() ;
224      }
225      else out=false; 
226    }
227   
228    // check to free blocs
229    MPI_Aint addr ;
230    MPI_Aint finalize ;
231    MPI_Win_lock(MPI_LOCK_SHARED, 0, 0, winControl_) ;
232    addr = control_[CONTROL_ADDR] ;
233    control_[CONTROL_ADDR] = 0 ;
234    finalize = control_[CONTROL_FINALIZE] ;
235    MPI_Win_unlock(0, winControl_) ;
236    freeBuffer(addr) ;
237    if (finalize==1) isFinalized_=true ;
238
239
240  }
241
242  void COneSidedClientBuffer::sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs)
243  {
244    ostringstream outStr ;
245    SRequest request ;
246    request.buffer = new CBufferOut(sizeof(timeline)+sizeof(nbSenders)+sizeof(nbBlocs)+(sizeof(MPI_Aint)+sizeof(int)+sizeof(int))*nbBlocs) ; 
247    *(request.buffer)<<timeline<<nbSenders<<nbBlocs ;
248    if (info.isActive(logProtocol))  outStr<<"New timeline event sent to server rank "<<serverRank_<<" : timeLine="<<timeline<<"  nbSenders="<<nbSenders<<"  nbBlocs="<<nbBlocs<<endl ;
249    auto it = blocs_.end() ;
250    for(int i=0 ; i<nbBlocs; ++i,--it) ;
251    for(int i=0 ; i<nbBlocs; ++i,++it) 
252    {
253      *(request.buffer) << it->addr << it->count << it->window;
254   
255      if (info.isActive(logProtocol))
256      {
257        size_t checksum=0 ;
258        for(size_t j=0;j<it->count;j++) checksum+=((unsigned char*)(it->addr))[j] ;
259        outStr<<"Bloc "<<i<<"  addr="<<it->addr<<"  count="<<it->count<<"  checksum="<<checksum<<"  ;  " ;
260      }
261    }
262    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ;
263    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
264    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ;
265    info(logProtocol)<<outStr.str()<<endl ;
266    requests_.push_back(request) ;
267  }
268
269  void COneSidedClientBuffer::sendResizeBufferEvent(size_t timeline, size_t size)
270  {
271    SRequest request ;
272    request.buffer = new CBufferOut(sizeof(EVENT_BUFFER_RESIZE)+sizeof(timeline)+sizeof(size)) ; 
273    *(request.buffer)<<EVENT_BUFFER_RESIZE<<timeline<<size ;
274    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, intraServerRank_, 20, interCommMerged_, &request.mpiRequest ) ;
275    requests_.push_back(request) ;
276  }
277
278  void COneSidedClientBuffer::sendNewBuffer(void)
279  {
280    MPI_Aint controlAddr ;
281    MPI_Get_address(control_, &controlAddr) ;
282    MPI_Send(&controlAddr, 1, MPI_AINT, intraServerRank_, 20, interCommMerged_) ;
283  }
284
285}
Note: See TracBrowser for help on using the repository browser.