source: XIOS3/trunk/src/buffer_server.cpp @ 2362

Last change on this file since 2362 was 2309, checked in by ymipsl, 2 years ago

Bug fix when resizing buffer on server size.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 8.1 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "buffer_server.hpp"
4#include "timer.hpp"
5
6
7namespace xios
8{
9
10  CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 
11  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress)
12  {
13    size = 3 * buffSize;
14    first = 0;
15    current = 1;
16    end = size;
17    used=0 ;
18    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ;
19    currentWindows=1 ;
20    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
21  }
22
23  CServerBuffer::~CServerBuffer()
24  {
25    MPI_Free_mem(buffer) ;
26  }
27
28  void CServerBuffer::updateCurrentWindows(void)
29  {
30    if (currentWindows==0) currentWindows=1 ;
31    else currentWindows=0 ;
32  }
33
34
35  bool CServerBuffer::isBufferFree(size_t count)
36  {
37    bool ret ;
38
39    if (count==0) return true ;
40
41    if (current>first)
42    {
43      if (current+count<size)
44      {
45        ret=true ;
46      }
47      else if (current+count==size)
48      {
49        if (first>0)
50        {
51          ret=true ;
52        }
53        else
54        {
55          ret=false ;
56        }
57      }
58      else
59      {
60        if (count<first)
61        {
62          ret=true ;
63        }
64        else
65        {
66          ret=false ;
67        }
68      }
69    }
70    else
71    {
72      if (current+count<first)
73      {
74        ret=true ;
75      }
76      else
77      {
78         ret=false ;
79      }
80    }
81
82    return ret ;
83  }
84
85  bool CServerBuffer::isBufferEmpty(void)
86  {
87    if (used==0) return true ;
88    else return false;
89  }
90
91  void* CServerBuffer::getBuffer(size_t count)
92  {
93    char* ret ;
94
95    if (count==0) return buffer+current ;
96
97    if (current>first)
98    {
99      if (current+count<size)
100      {
101        ret=buffer+current ;
102        current+=count ;
103      }
104      else if (current+count==size)
105      {
106        if (first>0)
107        {
108          ret=buffer+current ;
109          current=0 ;
110        }
111        else
112        {
113          ERROR("void* CServerBuffer::getBuffer(size_t count)",
114                 <<"cannot allocate required size in buffer") ;
115        }
116      }
117      else
118      {
119        end=current ;
120        if (count<first)
121        {
122          ret=buffer ;
123          current=count ;
124        }
125        else
126        {
127          ERROR("void* CServerBuffer::getBuffer(size_t count)",
128                 <<"cannot allocate required size in buffer") ;
129        }
130      }
131    }
132    else
133    {
134      if (current+count<first)
135      {
136        ret=buffer+current ;
137        current+=count ;
138      }
139      else
140      {
141          ERROR("void* CServerBuffer::getBuffer(size_t count)",
142                 <<"cannot allocate required size in buffer") ;
143      }
144    }
145
146    used+=count ;
147    return ret ;
148  }
149
150  void CServerBuffer::freeBuffer(size_t count)
151  {
152    if (count==0) return ;
153
154    if (first==end-1)
155    {
156      first=0 ;
157      count-- ;
158      used-- ;
159      end=size ;
160    }
161
162    if (first<=current)
163    {
164      if (first+count <current)
165      {
166        first+=count ;
167      }
168      else
169      {
170          ERROR("void CServerBuffer::freeBuffer(size_t count)",
171                 <<"cannot free required size in buffer") ;
172      }
173
174    }
175    else
176    {
177      if (first+count<end)
178      {
179        first+=count ;
180      }
181      else
182      {
183          ERROR("void CServerBuffer::freeBuffer(size_t count)",
184                 <<"cannot free required size in buffer") ;
185      }
186    }
187    used-=count ;
188  }
189
190  void CServerBuffer::popBuffer(size_t count)
191  {
192    if (count==0) return ;
193
194    if (current==0) 
195    {
196      current = end ;
197      end=size ;
198    }
199   
200
201    if (first<=current)
202    {
203      if (current-count >first)
204      {
205        current-=count ;
206      }
207      else
208      {
209          ERROR("void CServerBuffer::popBuffer(size_t count)",
210                 <<"cannot pop required size in buffer") ;
211      }
212
213    }
214    else
215    {
216      if (current-count>=0)
217      {
218        current-=count ;
219      }
220      else
221      {
222          ERROR("void CServerBuffer::freeBuffer(size_t count)",
223                 <<"cannot pop required size in buffer") ;
224      }
225    }
226    used-=count ;
227  }
228
229  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count)
230  {
231    count = -1 ;
232    if (!hasWindows || resizingBuffer_) return false ;
233    double time=MPI_Wtime() ;
234    if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false;
235    bufferFromClientTime_ = time ;
236    CTimer::get("getBufferFromClient").resume() ;   
237    size_t clientTimeline ;
238    size_t clientCount ;
239    bool ok=false ;
240   
241    MPI_Group group ;
242    int groupSize,groupRank ;
243    MPI_Win_get_group(windows_[currentWindows], &group) ;
244    MPI_Group_size(group, &groupSize) ;
245    MPI_Group_rank(group, &groupRank) ;
246   
247    lockBuffer(); 
248    CTimer::get("getBufferFromClient_locked").resume() ;   
249// lock is acquired
250
251    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
252    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
253    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;
254
255    if (timeLine==clientTimeline)
256    {
257      buffer=(char*)getBuffer(clientCount) ;
258      count=clientCount ;
259      MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ;
260      clientTimeline = 0 ;
261      clientCount = 0 ;
262      MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
263      MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
264
265// release lock
266      CTimer::get("getBufferFromClient_locked").suspend() ;   
267      unlockBuffer() ;
268
269      ok=true ;
270      char checksum=0 ;
271      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ;
272      char checksumFirst=0 ;
273      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ;
274      char checksumLast=0 ;
275      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ;
276     
277      info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "
278              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" "
279              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 
280              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ;
281
282    }
283    else
284    {
285      count=0 ;
286 
287 // release lock
288      CTimer::get("getBufferFromClient_locked").suspend() ; 
289      unlockBuffer() ;
290    }
291    CTimer::get("getBufferFromClient").suspend() ;   
292    if (ok) return true ;
293
294    return false ;
295  }
296 
297  void CServerBuffer::lockBuffer(void)
298  {
299    if (!hasWindows) return ;
300    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
301  }
302
303  void CServerBuffer::unlockBuffer(void)
304  {
305    if (!hasWindows) return ;
306    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ;
307  }
308 
309  void CServerBuffer::notifyClientFinalize(void)
310  {
311    if (!hasWindows) return ;
312    size_t notify=notifyFinalize_ ;
313    lockBuffer(); 
314// lock is acquired
315    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
316    unlockBuffer() ;
317  }
318
319  void CServerBuffer::notifyBufferResizing(void)
320  {
321    resizingBuffer_=true ;
322    if (!hasWindows) return ;
323    size_t notify=notifyResizeBuffer_ ;
324    lockBuffer(); 
325// lock is acquired
326    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
327    unlockBuffer() ;
328  }
329}
Note: See TracBrowser for help on using the repository browser.