source: XIOS3/trunk/src/buffer_server.cpp

Last change on this file was 2558, checked in by ymipsl, 9 months ago

p2p transport protocol

  • bug fix
  • more diagnostics
  • set buffer parameters to realistic values

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 8.2 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "buffer_server.hpp"
4#include "timer.hpp"
5#include "window_dynamic.hpp"
6
7
8namespace xios
9{
10
11  CServerBuffer::CServerBuffer(int clientRank, vector<CWindowDynamic*>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 
12  : hasWindows(true), clientRank_(clientRank), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress)
13  {
14    size = 3 * buffSize;
15    first = 0;
16    current = 1;
17    end = size;
18    used=0 ;
19    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ;
20    currentWindows=1 ;
21    if (windows[0]==nullptr && windows[1]==nullptr) hasWindows=false ;
22  }
23
24  CServerBuffer::~CServerBuffer()
25  {
26    MPI_Free_mem(buffer) ;
27  }
28
29  void CServerBuffer::updateCurrentWindows(void)
30  {
31    if (currentWindows==0) currentWindows=1 ;
32    else currentWindows=0 ;
33  }
34
35
36  bool CServerBuffer::isBufferFree(size_t count)
37  {
38    bool ret ;
39
40    if (count==0) return true ;
41
42    if (current>first)
43    {
44      if (current+count<size)
45      {
46        ret=true ;
47      }
48      else if (current+count==size)
49      {
50        if (first>0)
51        {
52          ret=true ;
53        }
54        else
55        {
56          ret=false ;
57        }
58      }
59      else
60      {
61        if (count<first)
62        {
63          ret=true ;
64        }
65        else
66        {
67          ret=false ;
68        }
69      }
70    }
71    else
72    {
73      if (current+count<first)
74      {
75        ret=true ;
76      }
77      else
78      {
79         ret=false ;
80      }
81    }
82
83    return ret ;
84  }
85
86  bool CServerBuffer::isBufferEmpty(void)
87  {
88    if (used==0) return true ;
89    else return false;
90  }
91
92  void* CServerBuffer::getBuffer(size_t count)
93  {
94    char* ret ;
95
96    if (count==0) return buffer+current ;
97
98    if (current>first)
99    {
100      if (current+count<size)
101      {
102        ret=buffer+current ;
103        current+=count ;
104      }
105      else if (current+count==size)
106      {
107        if (first>0)
108        {
109          ret=buffer+current ;
110          current=0 ;
111        }
112        else
113        {
114          ERROR("void* CServerBuffer::getBuffer(size_t count)",
115                 <<"cannot allocate required size in buffer") ;
116        }
117      }
118      else
119      {
120        end=current ;
121        if (count<first)
122        {
123          ret=buffer ;
124          current=count ;
125        }
126        else
127        {
128          ERROR("void* CServerBuffer::getBuffer(size_t count)",
129                 <<"cannot allocate required size in buffer") ;
130        }
131      }
132    }
133    else
134    {
135      if (current+count<first)
136      {
137        ret=buffer+current ;
138        current+=count ;
139      }
140      else
141      {
142          ERROR("void* CServerBuffer::getBuffer(size_t count)",
143                 <<"cannot allocate required size in buffer") ;
144      }
145    }
146
147    used+=count ;
148    return ret ;
149  }
150
151  void CServerBuffer::freeBuffer(size_t count)
152  {
153    if (count==0) return ;
154
155    if (first==end-1)
156    {
157      first=0 ;
158      count-- ;
159      used-- ;
160      end=size ;
161    }
162
163    if (first<=current)
164    {
165      if (first+count <current)
166      {
167        first+=count ;
168      }
169      else
170      {
171          ERROR("void CServerBuffer::freeBuffer(size_t count)",
172                 <<"cannot free required size in buffer") ;
173      }
174
175    }
176    else
177    {
178      if (first+count<end)
179      {
180        first+=count ;
181      }
182      else
183      {
184          ERROR("void CServerBuffer::freeBuffer(size_t count)",
185                 <<"cannot free required size in buffer") ;
186      }
187    }
188    used-=count ;
189  }
190
191  void CServerBuffer::popBuffer(size_t count)
192  {
193    if (count==0) return ;
194
195    if (current==0) 
196    {
197      current = end ;
198      end=size ;
199    }
200   
201
202    if (first<=current)
203    {
204      if (current-count >first)
205      {
206        current-=count ;
207      }
208      else
209      {
210          ERROR("void CServerBuffer::popBuffer(size_t count)",
211                 <<"cannot pop required size in buffer") ;
212      }
213
214    }
215    else
216    {
217      if (current-count>=0)
218      {
219        current-=count ;
220      }
221      else
222      {
223          ERROR("void CServerBuffer::freeBuffer(size_t count)",
224                 <<"cannot pop required size in buffer") ;
225      }
226    }
227    used-=count ;
228  }
229
230  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count)
231  {
232    count = -1 ;
233    if (!hasWindows || resizingBuffer_) return false ;
234    double time=MPI_Wtime() ;
235    if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false;
236    bufferFromClientTime_ = time ;
237    CTimer::get("getBufferFromClient").resume() ;   
238    size_t clientTimeline ;
239    size_t clientCount ;
240    bool ok=false ;
241   
242    info(100)<<"getBufferFromClient : check data in client buffer  : clientRank "<<clientRank_<<" timeline "<<timeLine<<" ??"<< endl ;
243    lockBuffer(); 
244    CTimer::get("getBufferFromClient_locked").resume() ;   
245// lock is acquired
246
247    windows_[currentWindows]->get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ;
248    windows_[currentWindows]->get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ;
249    windows_[currentWindows]->flush(windowsRank_) ;
250   
251    if (timeLine==clientTimeline)
252    {
253      buffer=(char*)getBuffer(clientCount) ;
254      count=clientCount ;
255      windows_[currentWindows]->get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR) ;
256     
257      clientTimeline = 0 ;
258      clientCount = 0 ;
259      windows_[currentWindows]->put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ;
260      windows_[currentWindows]->put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ;
261
262// release lock
263      CTimer::get("getBufferFromClient_locked").suspend() ;   
264      unlockBuffer() ;
265
266      ok=true ;
267      char checksum=0 ;
268      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ;
269      char checksumFirst=0 ;
270      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ;
271      char checksumLast=0 ;
272      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ;
273     
274      info(40)<<"getBufferFromClient timeLine==clientTimeLine: clientRank "<<clientRank_<<" timeline "<<timeLine<<" clientTimeline "
275              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" "
276              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 
277              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ;
278
279    }
280    else
281    {
282      count=0 ;
283 
284 // release lock
285      CTimer::get("getBufferFromClient_locked").suspend() ; 
286      unlockBuffer() ;
287    }
288    CTimer::get("getBufferFromClient").suspend() ; 
289    info(100)<<"getBufferFromClient : check data in client buffer ==> done"<<endl ;
290
291    if (ok) return true ;
292
293    return false ;
294  }
295 
296  void CServerBuffer::lockBuffer(void)
297  {
298    if (!hasWindows) return ;
299    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
300    windows_[currentWindows]->lockExclusive(windowsRank_) ;
301  }
302
303  void CServerBuffer::unlockBuffer(void)
304  {
305    if (!hasWindows) return ;
306    windows_[currentWindows]->unlockExclusive(windowsRank_) ;
307  }
308 
309  void CServerBuffer::notifyClientFinalize(void)
310  {
311    if (!hasWindows) return ;
312    size_t notify=notifyFinalize_ ;
313    lockBuffer(); 
314// lock is acquired
315    windows_[currentWindows]->put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ;
316    unlockBuffer() ;
317  }
318
319  void CServerBuffer::notifyBufferResizing(void)
320  {
321    resizingBuffer_=true ;
322    if (!hasWindows) return ;
323    size_t notify=notifyResizeBuffer_ ;
324    lockBuffer(); 
325// lock is acquired
326    windows_[currentWindows]->put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT) ;
327    unlockBuffer() ;
328  }
329}
Note: See TracBrowser for help on using the repository browser.