source: XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp @ 2130

Last change on this file since 2130 was 2130, checked in by ymipsl, 3 years ago

New management of client-server buffers.

  • buffers can grow automatically in intialization phase
  • buffers is evaluated after the close context definition phase and fixed at optimal value.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 8.9 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "buffer_server.hpp"
4
5
6namespace xios
7{
8
9  CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 
10  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress)
11  {
12    size = 3 * buffSize;
13    first = 0;
14    current = 1;
15    end = size;
16    used=0 ;
17    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ;
18    currentWindows=1 ;
19    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
20  }
21
22  CServerBuffer::~CServerBuffer()
23  {
24    MPI_Free_mem(buffer) ;
25  }
26
27  void CServerBuffer::updateCurrentWindows(void)
28  {
29    if (currentWindows==0) currentWindows=1 ;
30    else currentWindows=0 ;
31  }
32
33/*
34  void CServerBuffer::createWindows(MPI_Comm oneSidedComm)
35  {
36    MPI_Barrier(oneSidedComm) ;
37    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;
38    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;
39    hasWindows=true ;
40    updateCurrentWindows() ;
41    MPI_Barrier(oneSidedComm) ;
42  }
43*/
44
45/*
46  bool CServerBuffer::freeWindows()
47  {
48    if (hasWindows)
49    {
50      size_t header[3] ;
51      size_t& control=header[2] ;
52      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows_[0]) ;
53      MPI_Get(&control, 1, MPI_LONG_LONG_INT, windowsRank , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ;
54      MPI_Win_unlock(0,windows[0]) ;
55      if (control==2)  // ok for free windows
56      {
57        MPI_Win_free( &(windows[0])) ;
58        MPI_Win_free( &(windows[1])) ;
59        hasWindows=false ;
60        return true ;
61      }
62      else return false ;
63    }
64    else return true ;
65  }
66*/
67
68  bool CServerBuffer::isBufferFree(size_t count)
69  {
70    bool ret ;
71
72    if (count==0) return true ;
73
74    if (current>first)
75    {
76      if (current+count<size)
77      {
78        ret=true ;
79      }
80      else if (current+count==size)
81      {
82        if (first>0)
83        {
84          ret=true ;
85        }
86        else
87        {
88          ret=false ;
89        }
90      }
91      else
92      {
93        if (count<first)
94        {
95          ret=true ;
96        }
97        else
98        {
99          ret=false ;
100        }
101      }
102    }
103    else
104    {
105      if (current+count<first)
106      {
107        ret=true ;
108      }
109      else
110      {
111         ret=false ;
112      }
113    }
114
115    return ret ;
116  }
117
118  bool CServerBuffer::isBufferEmpty(void)
119  {
120    if (used==0) return true ;
121    else return false;
122  }
123
124  void* CServerBuffer::getBuffer(size_t count)
125  {
126    char* ret ;
127
128    if (count==0) return buffer+current ;
129
130    if (current>first)
131    {
132      if (current+count<size)
133      {
134        ret=buffer+current ;
135        current+=count ;
136      }
137      else if (current+count==size)
138      {
139        if (first>0)
140        {
141          ret=buffer+current ;
142          current=0 ;
143        }
144        else
145        {
146          ERROR("void* CServerBuffer::getBuffer(size_t count)",
147                 <<"cannot allocate required size in buffer") ;
148        }
149      }
150      else
151      {
152        end=current ;
153        if (count<first)
154        {
155          ret=buffer ;
156          current=count ;
157        }
158        else
159        {
160          ERROR("void* CServerBuffer::getBuffer(size_t count)",
161                 <<"cannot allocate required size in buffer") ;
162        }
163      }
164    }
165    else
166    {
167      if (current+count<first)
168      {
169        ret=buffer+current ;
170        current+=count ;
171      }
172      else
173      {
174          ERROR("void* CServerBuffer::getBuffer(size_t count)",
175                 <<"cannot allocate required size in buffer") ;
176      }
177    }
178
179    used+=count ;
180    return ret ;
181  }
182
183  void CServerBuffer::freeBuffer(size_t count)
184  {
185    if (count==0) return ;
186
187    if (first==end-1)
188    {
189      first=0 ;
190      count-- ;
191      end=size ;
192    }
193
194    if (first<=current)
195    {
196      if (first+count <current)
197      {
198        first+=count ;
199      }
200      else
201      {
202          ERROR("void CServerBuffer::freeBuffer(size_t count)",
203                 <<"cannot free required size in buffer") ;
204      }
205
206    }
207    else
208    {
209      if (first+count<end)
210      {
211        first+=count ;
212      }
213      else
214      {
215          ERROR("void CServerBuffer::freeBuffer(size_t count)",
216                 <<"cannot free required size in buffer") ;
217      }
218    }
219    used-=count ;
220  }
221
222  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count)
223  {
224    if (!hasWindows || resizingBuffer_) return false ;
225
226   
227    size_t header[3] ;
228    size_t& clientTimeline=header[0] ;
229    size_t& clientCount=header[1] ;
230    size_t& control=header[2] ;
231    bool ok=false ;
232   
233    MPI_Group group ;
234    int groupSize,groupRank ;
235    MPI_Win_get_group(windows_[currentWindows], &group) ;
236    MPI_Group_size(group, &groupSize) ;
237    MPI_Group_rank(group, &groupRank) ;
238   
239    lockBuffer(); 
240
241// lock is acquired
242
243    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],0), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
244    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
245    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;
246
247//    control=1 ;
248//    MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
249   
250//    MPI_Win_unlock(windowsRank_, windows_[currentWindows]) ;
251    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;
252//    info(100)<<"getBufferFromClient : windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ;
253    if (timeLine==clientTimeline)
254    {
255//      info(50)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "<<clientTimeline<<" clientCount "<<clientCount<<endl ;
256 
257//      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
258      buffer=(char*)getBuffer(clientCount) ;
259      count=clientCount ;
260      MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ;
261      clientTimeline = 0 ;
262      clientCount = 0 ;
263//      control=0 ;
264      MPI_Put(&header[0], 2, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],0) , 2, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
265
266// release lock
267     unlockBuffer() ;
268
269      ok=true ;
270      char checksum=0 ;
271      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ;
272      char checksumFirst=0 ;
273      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ;
274      char checksumLast=0 ;
275      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ;
276     
277      info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "
278              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" "
279              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 
280              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ;
281
282    }
283    else
284    {
285      //MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
286      //control=0 ;
287      //MPI_Put(&control, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
288 
289 // release lock
290      unlockBuffer() ;
291    }
292
293    if (ok) return true ;
294
295    return false ;
296  }
297 
298  void CServerBuffer::lockBuffer(void)
299  {
300    if (!hasWindows) return ;
301
302    long long int lock=1 ;
303    long long int zero=0, one=1 ;
304//    control=1 ;
305    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
306    while(lock!=0)
307    {
308      MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)),
309                           windows_[currentWindows]) ;
310      MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;
311    }
312  }
313
314  void CServerBuffer::unlockBuffer(void)
315  {
316    if (!hasWindows) return ;
317    long long int lock=1 ;
318    long long int zero=0, one=1 ;
319   
320    MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],2*sizeof(size_t)),
321                          windows_[currentWindows]) ;
322    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ; 
323    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ;
324  }
325 
326  void CServerBuffer::notifyClientFinalize(void)
327  {
328    if (!hasWindows) return ;
329    size_t finalize=1 ;
330    lockBuffer(); 
331// lock is acquired
332    MPI_Put(&finalize, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],3*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
333    unlockBuffer() ;
334  }
335}
Note: See TracBrowser for help on using the repository browser.