source: XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_server.cpp @ 2246

Last change on this file since 2246 was 2246, checked in by ymipsl, 3 years ago
  • Update of the tranfer protocol using one sided communication
  • Introduce MPI_Improb/MPI_mrecv to listen incomming request
  • Introducing latency when looping over managers

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 7.4 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "buffer_server.hpp"
4#include "timer.hpp"
5
6
7namespace xios
8{
9
10  CServerBuffer::CServerBuffer(vector<MPI_Win>& windows, vector<MPI_Aint>& winAddress, int windowsRank, StdSize buffSize) 
11  : hasWindows(true), windows_(windows), windowsRank_(windowsRank), winAddress_(winAddress)
12  {
13    size = 3 * buffSize;
14    first = 0;
15    current = 1;
16    end = size;
17    used=0 ;
18    MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer) ;
19    currentWindows=1 ;
20    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
21  }
22
23  CServerBuffer::~CServerBuffer()
24  {
25    MPI_Free_mem(buffer) ;
26  }
27
28  void CServerBuffer::updateCurrentWindows(void)
29  {
30    if (currentWindows==0) currentWindows=1 ;
31    else currentWindows=0 ;
32  }
33
34
35  bool CServerBuffer::isBufferFree(size_t count)
36  {
37    bool ret ;
38
39    if (count==0) return true ;
40
41    if (current>first)
42    {
43      if (current+count<size)
44      {
45        ret=true ;
46      }
47      else if (current+count==size)
48      {
49        if (first>0)
50        {
51          ret=true ;
52        }
53        else
54        {
55          ret=false ;
56        }
57      }
58      else
59      {
60        if (count<first)
61        {
62          ret=true ;
63        }
64        else
65        {
66          ret=false ;
67        }
68      }
69    }
70    else
71    {
72      if (current+count<first)
73      {
74        ret=true ;
75      }
76      else
77      {
78         ret=false ;
79      }
80    }
81
82    return ret ;
83  }
84
85  bool CServerBuffer::isBufferEmpty(void)
86  {
87    if (used==0) return true ;
88    else return false;
89  }
90
91  void* CServerBuffer::getBuffer(size_t count)
92  {
93    char* ret ;
94
95    if (count==0) return buffer+current ;
96
97    if (current>first)
98    {
99      if (current+count<size)
100      {
101        ret=buffer+current ;
102        current+=count ;
103      }
104      else if (current+count==size)
105      {
106        if (first>0)
107        {
108          ret=buffer+current ;
109          current=0 ;
110        }
111        else
112        {
113          ERROR("void* CServerBuffer::getBuffer(size_t count)",
114                 <<"cannot allocate required size in buffer") ;
115        }
116      }
117      else
118      {
119        end=current ;
120        if (count<first)
121        {
122          ret=buffer ;
123          current=count ;
124        }
125        else
126        {
127          ERROR("void* CServerBuffer::getBuffer(size_t count)",
128                 <<"cannot allocate required size in buffer") ;
129        }
130      }
131    }
132    else
133    {
134      if (current+count<first)
135      {
136        ret=buffer+current ;
137        current+=count ;
138      }
139      else
140      {
141          ERROR("void* CServerBuffer::getBuffer(size_t count)",
142                 <<"cannot allocate required size in buffer") ;
143      }
144    }
145
146    used+=count ;
147    return ret ;
148  }
149
150  void CServerBuffer::freeBuffer(size_t count)
151  {
152    if (count==0) return ;
153
154    if (first==end-1)
155    {
156      first=0 ;
157      count-- ;
158      end=size ;
159    }
160
161    if (first<=current)
162    {
163      if (first+count <current)
164      {
165        first+=count ;
166      }
167      else
168      {
169          ERROR("void CServerBuffer::freeBuffer(size_t count)",
170                 <<"cannot free required size in buffer") ;
171      }
172
173    }
174    else
175    {
176      if (first+count<end)
177      {
178        first+=count ;
179      }
180      else
181      {
182          ERROR("void CServerBuffer::freeBuffer(size_t count)",
183                 <<"cannot free required size in buffer") ;
184      }
185    }
186    used-=count ;
187  }
188
189  bool CServerBuffer::getBufferFromClient(size_t timeLine, char*& buffer, size_t& count)
190  {
191    count = -1 ;
192    if (!hasWindows || resizingBuffer_) return false ;
193    double time=MPI_Wtime() ;
194    if (time-bufferFromClientTime_ < bufferFromClientLatency_ ) return false;
195    bufferFromClientTime_ = time ;
196    CTimer::get("getBufferFromClient").resume() ;   
197    size_t clientTimeline ;
198    size_t clientCount ;
199    bool ok=false ;
200   
201    MPI_Group group ;
202    int groupSize,groupRank ;
203    MPI_Win_get_group(windows_[currentWindows], &group) ;
204    MPI_Group_size(group, &groupSize) ;
205    MPI_Group_rank(group, &groupRank) ;
206   
207    lockBuffer(); 
208    CTimer::get("getBufferFromClient_locked").resume() ;   
209// lock is acquired
210
211    MPI_Get(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
212    MPI_Get(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
213    MPI_Win_flush(windowsRank_, windows_[currentWindows]) ;
214
215    if (timeLine==clientTimeline)
216    {
217      buffer=(char*)getBuffer(clientCount) ;
218      count=clientCount ;
219      MPI_Get(buffer, clientCount, MPI_CHAR, windowsRank_, MPI_Aint_add(winAddress_[currentWindows],4*sizeof(size_t)) , clientCount, MPI_CHAR, windows_[currentWindows]) ;
220      clientTimeline = 0 ;
221      clientCount = 0 ;
222      MPI_Put(&clientTimeline, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],timeLineOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
223      MPI_Put(&clientCount, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows],countOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
224
225// release lock
226      CTimer::get("getBufferFromClient_locked").suspend() ;   
227      unlockBuffer() ;
228
229      ok=true ;
230      char checksum=0 ;
231      for(size_t i=0;i<count;i++) checksum=checksum+buffer[i] ;
232      char checksumFirst=0 ;
233      for(size_t i=5; i<10 && i<count ;i++) checksumFirst=checksumFirst+buffer[i] ;
234      char checksumLast=0 ;
235      for(size_t i=(count<10)?0:count-10; i<count ; i++) checksumLast=checksumLast+buffer[i] ;
236     
237      info(40)<<"getBufferFromClient timeLine==clientTimeLine: windowsRank "<<windowsRank_<<" timeline "<<timeLine<<" clientTimeline "
238              <<clientTimeline<<" clientCount "<<count<<" checksum "<<(int)checksum<<" "
239              <<(int)buffer[0]<<" "<<(int)buffer[1]<<" "<<(int)buffer[2]<<" "<<(int)buffer[3]<<" "<<(int)buffer[4]<<" "<<(int)buffer[5]<<" " 
240              <<(int)buffer[6]<<" "<<(int)buffer[7]<<" "<<(int)buffer[8]<<" "<<(int)buffer[9]<<" "<<(int)buffer[10]<<" "<<(int)buffer[11]<<endl ;
241
242    }
243    else
244    {
245      count=0 ;
246 
247 // release lock
248      CTimer::get("getBufferFromClient_locked").suspend() ; 
249      unlockBuffer() ;
250    }
251    CTimer::get("getBufferFromClient").suspend() ;   
252    if (ok) return true ;
253
254    return false ;
255  }
256 
257  void CServerBuffer::lockBuffer(void)
258  {
259    if (!hasWindows) return ;
260    MPI_Win_lock(MPI_LOCK_EXCLUSIVE,windowsRank_,0,windows_[currentWindows]) ;
261  }
262
263  void CServerBuffer::unlockBuffer(void)
264  {
265    if (!hasWindows) return ;
266    MPI_Win_unlock(windowsRank_,windows_[currentWindows]) ;
267  }
268 
269  void CServerBuffer::notifyClientFinalize(void)
270  {
271    if (!hasWindows) return ;
272    size_t notify=notifyFinalize_ ;
273    lockBuffer(); 
274// lock is acquired
275    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
276    unlockBuffer() ;
277  }
278
279  void CServerBuffer::notifyBufferResizing(void)
280  {
281    resizingBuffer_=true ;
282    if (!hasWindows) return ;
283    size_t notify=notifyResizeBuffer_ ;
284    lockBuffer(); 
285// lock is acquired
286    MPI_Put(&notify, 1, MPI_LONG_LONG_INT, windowsRank_ , MPI_Aint_add(winAddress_[currentWindows], notifyOffset_*sizeof(size_t)), 1, MPI_LONG_LONG_INT,windows_[currentWindows]) ;
287    unlockBuffer() ;
288  }
289}
Note: See TracBrowser for help on using the repository browser.