source: XIOS3/trunk/src/buffer_client.cpp @ 2458

Last change on this file since 2458 was 2458, checked in by ymipsl, 17 months ago

Merge XIOS_FILE_SERVICE dev branch into trunk

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 13.2 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "log.hpp"
4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "tracer.hpp"
9#include "timeline_events.hpp"
10#include "timer.hpp"
11
12namespace xios
13{
14  size_t CClientBuffer::maxRequestSize = 0;
15
16  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)
17    : interComm(interComm)
18    , clientRank_(0)
19    , serverRank(serverRank)
20    , bufferSize(bufferSize)
21    , estimatedMaxEventSize(estimatedMaxEventSize)
22    , maxEventSize(0)
23    , current(0)
24    , count(0)
25    , pending(false)
26    , hasWindows(false) 
27  {
28     /*
29      if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
30      else hasWindows=true ;
31     */
32
33      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
34      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
35      buffer[0] = bufferHeader[0]+headerSize_ ;
36      buffer[1] = bufferHeader[1]+headerSize_ ;
37      firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_ ;
38      firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_ ;
39      bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
40      bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
41      control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;
42      control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
43      notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
44      notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
45
46      *firstTimeLine[0]=0 ;
47      *firstTimeLine[1]=0 ;
48      *bufferCount[0]=0 ;
49      *bufferCount[1]=0 ;
50      *control[0]=0 ;
51      *control[1]=0 ;
52      *notify[0]=notifyNothing_ ;
53      *notify[1]=notifyNothing_ ;
54      winState[0]=false ;
55      winState[1]=false ;
56
57
58    if (hasWindows)
59    { 
60   
61      MPI_Aint buffSize=bufferSize+headerSize_ ;
62      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ;
63      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ;
64   
65      MPI_Group group ;
66      int groupSize,groupRank ;
67      MPI_Win_get_group(windows_[0], &group) ;
68      MPI_Group_size(group, &groupSize) ;
69      MPI_Group_rank(group, &groupRank) ;
70      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
71
72      MPI_Win_get_group(windows_[1], &group) ;
73      MPI_Group_size(group, &groupSize) ;
74      MPI_Group_rank(group, &groupRank) ;
75      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
76
77      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
78      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
79
80      MPI_Win_unlock(clientRank_, windows_[1]) ;
81      MPI_Win_unlock(clientRank_, windows_[0]) ;
82    } 
83    retBuffer = new CBufferOut(buffer[current], bufferSize);
84    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
85  }
86
87  MPI_Aint CClientBuffer::getWinAddress(int i)
88  {
89    MPI_Aint address ;
90    MPI_Get_address(bufferHeader[i], &address) ;
91    return address ;
92  }
93
94  void CClientBuffer::attachWindows(vector<MPI_Win>& windows)
95  {
96    isAttachedWindows_=true ;
97    windows_=windows ;
98    if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ;
99    else hasWindows=true ;
100
101    if (hasWindows)
102    { 
103      MPI_Aint buffSize=bufferSize+headerSize_ ;
104      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ;
105      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ;
106   
107      MPI_Group group ;
108      int groupSize,groupRank ;
109      MPI_Win_get_group(windows_[0], &group) ;
110      MPI_Group_size(group, &groupSize) ;
111      MPI_Group_rank(group, &groupRank) ;
112      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
113
114      MPI_Win_get_group(windows_[1], &group) ;
115      MPI_Group_size(group, &groupSize) ;
116      MPI_Group_rank(group, &groupRank) ;
117      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
118
119      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
120      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
121
122      MPI_Win_unlock(clientRank_, windows_[1]) ;
123      MPI_Win_unlock(clientRank_, windows_[0]) ;
124    } 
125
126  }
127
128
129  CClientBuffer::~CClientBuffer()
130  {
131     //freeWindows() ;
132     if (hasWindows)
133     {
134       MPI_Win_detach(windows_[0],bufferHeader[0]);
135       MPI_Win_detach(windows_[1],bufferHeader[1]);
136       MPI_Free_mem(bufferHeader[0]) ;
137       MPI_Free_mem(bufferHeader[1]) ;
138     }
139     delete retBuffer;
140  }
141
142  void CClientBuffer::lockBuffer(void)
143  {
144    CTimer::get("lock buffer").resume();
145    if (hasWindows)
146    {
147      if (winState[current]==true) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo lock client buffer but winState said it is already locked") ;
148      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ;
149      winState[current]=true ;
150    }
151    CTimer::get("lock buffer").suspend();
152  }
153
154  void CClientBuffer::unlockBuffer(void)
155  {
156    CTimer::get("unlock buffer").resume();
157    if (hasWindows)
158    {
159      if (winState[current]==false) ERROR("CClientBuffer::lockBuffer(void)",<<"Try lo unlock client buffer but winState said it is already unlocked") ;
160      MPI_Win_unlock(clientRank_, windows_[current]) ;
161      winState[current]=false ;
162    }
163    CTimer::get("unlock buffer").suspend();
164  }
165
166  StdSize CClientBuffer::remain(void)
167  {
168    return bufferSize - count;
169  }
170
171  bool CClientBuffer::isBufferFree(StdSize size)
172  {
173    if (!isAttachedWindows_) return false;
174
175    lockBuffer();
176    count=*bufferCount[current] ;
177   
178    if (resizingBufferStep_ > 0 ) return false ;
179
180    if (size > bufferSize)
181    {
182      resizingBufferStep_=1 ;
183      *firstTimeLine[current]=0 ;
184      newBufferSize_=size ;
185      return false ;
186    }
187
188    if (size > maxEventSize)
189    {
190      maxEventSize = size;
191
192      if (size > estimatedMaxEventSize)
193        error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank
194                 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;
195
196      if (size > maxRequestSize) maxRequestSize = size;
197    }
198   
199    if (size > remain())
200    {
201      if (isGrowableBuffer_)
202      {
203        resizingBufferStep_ = 1 ;
204        *firstTimeLine[current]=0 ;
205        newBufferSize_ = (count+size)*growFactor_ ;
206      } 
207      return false ;
208    }
209    else return true ;
210  }
211
212
213  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
214  {
215    if (size <= remain())
216    {
217      retBuffer->realloc(buffer[current] + count, size);
218      count += size;
219      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
220      *bufferCount[current]=count ;
221      return retBuffer;
222    }
223    else
224    {
225      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
226            << "Not enough space in buffer, this should not have happened...");
227      return NULL;
228    }
229  }
230
231  void CClientBuffer::infoBuffer(void)
232  {
233     
234      char checksum=0 ;
235      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ;
236 
237      char checksumFirst=0 ;
238      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ;
239 
240      char checksumLast=0 ;
241      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ;
242 
243      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current]
244              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" "
245              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" "
246              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ;
247
248  }
249
250  bool CClientBuffer::checkBuffer(bool send)
251  {
252    MPI_Status status;
253    int flag;
254    MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
255 
256    if (pending)
257    {
258      traceOff();
259      MPI_Test(&request, &flag, &status);
260      traceOn();
261      if (flag == true) pending = false;
262    }
263
264    if (!pending)
265    {
266      if (!send && resizingBufferStep_==0 ) return false ;
267
268      if (count > 0)
269      {
270        double time=MPI_Wtime() ;
271        if (time - lastCheckedWithNothing_ > latency_)
272        {
273          lockBuffer() ;
274          if (*bufferCount[current] > 0)
275          {
276            MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
277            if (resizingBufferStep_==4) resizingBufferStep_=0 ;
278            pending = true;
279            *firstTimeLine[current]=0 ;
280            *bufferCount[current]=0 ;
281
282             unlockBuffer() ;
283
284            if (current == 1) current = 0;
285            else current = 1;
286            count = 0;
287          }
288          else 
289          {
290            unlockBuffer() ;
291            lastCheckedWithNothing_ = time ;
292          }
293        }
294      }
295      else
296      {
297        if (resizingBufferStep_==1) resizeBufferNotify() ;
298        else if (resizingBufferStep_==2) isNotifiedChangeBufferSize() ;
299        else if (resizingBufferStep_==3) resizeBuffer(newBufferSize_) ;
300      }
301    }
302
303    return pending;
304  }
305
306  void CClientBuffer::resizeBufferNotify(void)
307  {
308    // notify server of changing buffers size
309    lockBuffer() ;
310    int size=sizeof(int)+sizeof(size_t) ;
311    CBufferOut* bufOut = this->getBuffer(timelineEventNotifyChangeBufferSize, size);
312    bufOut->put(size);
313    bufOut->put(timelineEventNotifyChangeBufferSize);
314    resizingBufferStep_ = 2 ;
315    unlockBuffer() ;
316  }
317
318  void CClientBuffer::resizeBuffer(size_t newSize)
319  {
320
321    if (hasWindows)
322    { 
323      MPI_Win_detach(windows_[0], bufferHeader[0]) ;
324      MPI_Win_detach(windows_[1], bufferHeader[1]) ;
325    }
326    MPI_Free_mem(bufferHeader[0]) ;
327    MPI_Free_mem(bufferHeader[1]) ;
328
329    bufferSize=newSize ;
330    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ;
331    MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[1]) ;
332    buffer[0] = bufferHeader[0]+headerSize_ ;
333    buffer[1] = bufferHeader[1]+headerSize_ ;
334    firstTimeLine[0]=(size_t*)bufferHeader[0] + timeLineOffset_;
335    firstTimeLine[1]=(size_t*)bufferHeader[1] + timeLineOffset_;
336    bufferCount[0]=(size_t*)bufferHeader[0] + countOffset_ ;
337    bufferCount[1]=(size_t*)bufferHeader[1] + countOffset_ ;
338    control[0]=(size_t*)bufferHeader[0] + controlOffset_ ;  // control=0 => nothing ; control=1 => changeBufferSize
339    control[1]=(size_t*)bufferHeader[1] + controlOffset_ ;
340    notify[0]=(size_t*)bufferHeader[0] + notifyOffset_ ;
341    notify[1]=(size_t*)bufferHeader[1] + notifyOffset_ ;
342
343    *firstTimeLine[0]=0 ;
344    *firstTimeLine[1]=0 ;
345    *bufferCount[0]=0 ;
346    *bufferCount[1]=0 ;
347    *control[0]=0 ;
348    *control[1]=0 ;
349    *notify[0] = notifyNothing_ ;
350    *notify[1] = notifyNothing_ ;
351    winState[0]=false ;
352    winState[1]=false ;
353    current=0 ;
354   
355    if (hasWindows)
356    { 
357   
358      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize_) ;
359      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize_) ;
360         
361      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
362      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
363
364      MPI_Win_unlock(clientRank_, windows_[1]) ;
365      MPI_Win_unlock(clientRank_, windows_[0]) ;
366    } 
367
368    lockBuffer() ;
369 
370    int size=sizeof(int)+2*sizeof(size_t)+2*sizeof(MPI_Aint) ;
371    CBufferOut* bufOut = this->getBuffer(timelineEventChangeBufferSize, size);
372    bufOut->put(size);
373    bufOut->put(timelineEventChangeBufferSize);
374    bufOut->put(newBufferSize_);
375    bufOut->put(this->getWinAddress(0));
376    bufOut->put(this->getWinAddress(1));
377
378    resizingBufferStep_=4;
379    unlockBuffer() ;
380    info(100)<<"CClientBuffer::resizeBuffer(size_t newSize) : resizing buffer of server "<<serverRank<<" ; new size : "<<newSize<<" ; winAdress[0] "<<this->getWinAddress(0)<<" winAdress[1] "<<this->getWinAddress(1)<<endl;
381  }
382
383  bool CClientBuffer::hasPendingRequest(void)
384  {
385   
386    lockBuffer() ;
387    count=*bufferCount[current] ;
388    unlockBuffer() ;
389
390    return (pending || count > 0);
391  }
392
393  bool CClientBuffer::isNotifiedChangeBufferSize(void)
394  {
395   
396    bool ret ;
397    lockBuffer() ;
398    ret=*notify[current] == notifyResizeBuffer_ ? true : false ;
399    if (ret || !hasWindows) 
400    {
401      *notify[current] = notifyNothing_ ;
402      resizingBufferStep_=3; 
403    }
404    unlockBuffer() ;
405
406    return ret;
407  }
408
409  bool CClientBuffer::isNotifiedFinalized(void)
410  {
411    if (!isFinalized_)
412    {
413      double time=MPI_Wtime() ;
414//      if (time - lastCheckedNotify_ > latency_)
415      {
416        int flag ;
417        MPI_Iprobe(MPI_ANY_SOURCE, MPI_ANY_TAG, MPI_COMM_WORLD, &flag, MPI_STATUS_IGNORE);
418        lockBuffer() ;
419        isFinalized_=*notify[current] == notifyFinalize_ ? true : false ;
420        unlockBuffer() ;
421        lastCheckedNotify_=time ;
422      }
423    }
424    return isFinalized_ ;
425  }
426
427}
Note: See TracBrowser for help on using the repository browser.