source: XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_server.cpp @ 1547

Last change on this file since 1547 was 1547, checked in by ymipsl, 6 years ago

New communication protocol between clients and servers, using hybrid mode of p2p mixt with one_sided communication in order to avoid dead-locking. The constraint of the maximum number of event that can be bufferized on client side is released.

Dev branch is created to be tested before merging.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 5.6 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "buffer_server.hpp"
4
5
6namespace xios
7{
8
9  CServerBuffer::CServerBuffer(StdSize buffSize) : hasWindows(false)
10  {
11    size = 3 * buffSize;
12    first = 0;
13    current = 1;
14    end = size;
15    used=0 ;
16    buffer = new char[size]; // use MPI_ALLOC_MEM later?
17    currentWindows=0 ;
18  }
19
20  CServerBuffer::~CServerBuffer()
21  {
22    delete [] buffer ;
23  }
24
25  void CServerBuffer::updateCurrentWindows(void)
26  {
27    if (currentWindows==0) currentWindows=1 ;
28    else currentWindows=0 ;
29  }
30 
31  void CServerBuffer::createWindows(MPI_Comm oneSidedComm)
32  {
33    MPI_Barrier(oneSidedComm) ;
34    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;
35    MPI_Win_create(NULL, 0, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;
36    hasWindows=true ;
37    MPI_Barrier(oneSidedComm) ;
38  }
39
40  bool CServerBuffer::freeWindows()
41  {
42    if (hasWindows)
43    {
44      size_t header[3] ;
45      size_t& control=header[2] ;
46      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[0]) ;
47      MPI_Get(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[0]) ;
48      MPI_Win_unlock(0,windows[0]) ;
49      if (control==2)  // ok for free windows
50      {
51        MPI_Win_free( &(windows[0])) ;
52        MPI_Win_free( &(windows[1])) ;
53        hasWindows=false ;
54        return true ;
55      }
56      else return false ;
57    }
58    else return true ;
59  }
60
61  bool CServerBuffer::isBufferFree(size_t count)
62  {
63    bool ret ;
64
65    if (count==0) return true ;
66
67    if (current>first)
68    {
69      if (current+count<size)
70      {
71        ret=true ;
72      }
73      else if (current+count==size)
74      {
75        if (first>0)
76        {
77          ret=true ;
78        }
79        else
80        {
81          ret=false ;
82        }
83      }
84      else
85      {
86        if (count<first)
87        {
88          ret=true ;
89        }
90        else
91        {
92          ret=false ;
93        }
94      }
95    }
96    else
97    {
98      if (current+count<first)
99      {
100        ret=true ;
101      }
102      else
103      {
104         ret=false ;
105      }
106    }
107
108    return ret ;
109  }
110
111  bool CServerBuffer::isBufferEmpty(void)
112  {
113    if (used==0) return true ;
114    else return false;
115  }
116
117  void* CServerBuffer::getBuffer(size_t count)
118  {
119    char* ret ;
120
121    if (count==0) return buffer+current ;
122
123    if (current>first)
124    {
125      if (current+count<size)
126      {
127        ret=buffer+current ;
128        current+=count ;
129      }
130      else if (current+count==size)
131      {
132        if (first>0)
133        {
134          ret=buffer+current ;
135          current=0 ;
136        }
137        else
138        {
139          ERROR("void* CServerBuffer::getBuffer(size_t count)",
140                 <<"cannot allocate required size in buffer") ;
141        }
142      }
143      else
144      {
145        end=current ;
146        if (count<first)
147        {
148          ret=buffer ;
149          current=count ;
150        }
151        else
152        {
153          ERROR("void* CServerBuffer::getBuffer(size_t count)",
154                 <<"cannot allocate required size in buffer") ;
155        }
156      }
157    }
158    else
159    {
160      if (current+count<first)
161      {
162        ret=buffer+current ;
163        current+=count ;
164      }
165      else
166      {
167          ERROR("void* CServerBuffer::getBuffer(size_t count)",
168                 <<"cannot allocate required size in buffer") ;
169      }
170    }
171
172    used+=count ;
173    return ret ;
174  }
175
176  void CServerBuffer::freeBuffer(size_t count)
177  {
178    if (count==0) return ;
179
180    if (first==end-1)
181    {
182      first=0 ;
183      count-- ;
184      end=size ;
185    }
186
187    if (first<=current)
188    {
189      if (first+count <current)
190      {
191        first+=count ;
192      }
193      else
194      {
195          ERROR("void CServerBuffer::freeBuffer(size_t count)",
196                 <<"cannot free required size in buffer") ;
197      }
198
199    }
200    else
201    {
202      if (first+count<end)
203      {
204        first+=count ;
205      }
206      else
207      {
208          ERROR("void CServerBuffer::freeBuffer(size_t count)",
209                 <<"cannot free required size in buffer") ;
210      }
211    }
212    used-=count ;
213  }
214
215  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count)
216  {
217    if (!hasWindows) return false ;
218
219   
220    size_t header[3] ;
221    size_t& clientTimeline=header[0] ;
222    size_t& clientCount=header[1] ;
223    size_t& control=header[2] ;
224    bool ok=false ;
225   
226    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ;
227
228    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, 0 , 0, 1, MPI_LONG_LONG_INT,windows[currentWindows]) ;
229    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, 0 , 1*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ;
230    control=1 ;
231    MPI_Put(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ;
232   
233    MPI_Win_unlock(0,windows[currentWindows]) ;
234
235    if (timeLine==clientTimeline)
236    {
237
238      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ;
239      buffer=(char*)getBuffer(clientCount) ;
240      count=clientCount ;
241      MPI_Get(buffer, clientCount, MPI_CHAR, 0, 3*sizeof(size_t) , clientCount, MPI_CHAR, windows[currentWindows]) ;
242      clientTimeline = 0 ;
243      clientCount = 0 ;
244      control=0 ;
245      MPI_Put(&header[0], 3, MPI_LONG_LONG_INT, 0, 0 , 3, MPI_LONG_LONG_INT,windows[currentWindows]) ;
246 
247      MPI_Win_unlock(0,windows[currentWindows]) ;
248      ok=true ;
249    }
250    else
251    {
252      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,0,0,windows[currentWindows]) ;
253      control=0 ;
254      MPI_Put(&control, 1, MPI_LONG_LONG_INT, 0 , 2*sizeof(size_t), 1, MPI_LONG_LONG_INT,windows[currentWindows]) ;
255      MPI_Win_unlock(0,windows[currentWindows]) ;
256    }
257
258    if (ok) return true ;
259
260    return false ;
261  }
262   
263   
264}
Note: See TracBrowser for help on using the repository browser.