source: XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp @ 1853

Last change on this file since 1853 was 1757, checked in by ymipsl, 5 years ago

Implement one sided communication in client/server protocol to avoid dead-lock when some buffer are full.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 10.1 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "log.hpp"
4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "tracer.hpp"
9
10namespace xios
11{
12  size_t CClientBuffer::maxRequestSize = 0;
13
14  CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)
15    : interComm(interComm)
16    , clientRank_(clientRank)
17    , serverRank(serverRank)
18    , bufferSize(bufferSize)
19    , estimatedMaxEventSize(estimatedMaxEventSize)
20    , maxEventSize(0)
21    , current(0)
22    , count(0)
23    , pending(false)
24    , hasWindows(false) 
25    , windows_(windows)
26  {
27    if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ;
28    else hasWindows=true ;
29
30      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ;
31      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ;
32      buffer[0] = bufferHeader[0]+headerSize ;
33      buffer[1] = bufferHeader[1]+headerSize ;
34      firstTimeLine[0]=(size_t*)bufferHeader[0] ;
35      firstTimeLine[1]=(size_t*)bufferHeader[1] ;
36      bufferCount[0]=(size_t*)bufferHeader[0] +1 ;
37      bufferCount[1]=(size_t*)bufferHeader[1] +1 ;
38      control[0]=(size_t*)bufferHeader[0] +2 ;
39      control[1]=(size_t*)bufferHeader[1] +2 ;
40      finalize[0]=(size_t*)bufferHeader[0] +3 ;
41      finalize[1]=(size_t*)bufferHeader[1] +3 ;
42
43      *firstTimeLine[0]=0 ;
44      *firstTimeLine[1]=0 ;
45      *bufferCount[0]=0 ;
46      *bufferCount[1]=0 ;
47      *control[0]=0 ;
48      *control[1]=0 ;
49      *finalize[0]=0 ;
50      *finalize[1]=0 ;
51      winState[0]=false ;
52      winState[1]=false ;
53
54
55    if (hasWindows)
56    { 
57   
58      MPI_Win_attach(windows_[0], bufferHeader[0], bufferSize+headerSize) ;
59      MPI_Win_attach(windows_[1], bufferHeader[1], bufferSize+headerSize) ;
60   
61      MPI_Group group ;
62      int groupSize,groupRank ;
63      MPI_Win_get_group(windows_[0], &group) ;
64      MPI_Group_size(group, &groupSize) ;
65      MPI_Group_rank(group, &groupRank) ;
66      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
67
68      MPI_Win_get_group(windows_[1], &group) ;
69      MPI_Group_size(group, &groupSize) ;
70      MPI_Group_rank(group, &groupRank) ;
71      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank);
72
73      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ;
74      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ;
75
76      MPI_Win_unlock(clientRank_, windows_[1]) ;
77      MPI_Win_unlock(clientRank_, windows_[0]) ;
78    } 
79    retBuffer = new CBufferOut(buffer[current], bufferSize);
80    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
81  }
82
83  MPI_Aint CClientBuffer::getWinAddress(int i)
84  {
85     MPI_Aint address ;
86     
87     if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ;
88     else address=0 ;
89
90     return address ;
91  }
92
93  CClientBuffer::~CClientBuffer()
94  {
95     //freeWindows() ;
96     if (hasWindows)
97     {
98       MPI_Win_detach(windows_[0],bufferHeader[0]);
99       MPI_Win_detach(windows_[1],bufferHeader[1]);
100       MPI_Free_mem(bufferHeader[0]) ;
101       MPI_Free_mem(bufferHeader[1]) ;
102     }
103     delete retBuffer;
104  }
105
106/*  void CClientBuffer::createWindows(MPI_Comm oneSidedComm)
107  {
108    MPI_Barrier(oneSidedComm) ;
109    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;
110    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;
111
112    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ;
113    *firstTimeLine[0]=0 ;
114    *bufferCount[0]=0 ;
115    *control[0]=0 ;
116    MPI_Win_unlock(0, windows[0]) ;
117
118    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ;
119    *firstTimeLine[1]=0 ;
120    *bufferCount[1]=0 ;
121    *control[1]=0 ;
122    MPI_Win_unlock(0, windows[1]) ;
123    winState[0]=false ;
124    winState[1]=false ;
125    MPI_Barrier(oneSidedComm) ;
126    hasWindows=true ;
127  }
128*/
129
130/* 
131  void CClientBuffer::freeWindows()
132  {
133    if (hasWindows)
134    {
135      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[0]) ;
136      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows_[1]) ;
137      *control[0]=2 ;
138      *control[1]=2 ;
139      MPI_Win_unlock(0, windows_[1]) ;
140      MPI_Win_unlock(0, windows_[0]) ;
141     
142      MPI_Win_free(&windows_[0]) ;
143      MPI_Win_free(&windows_[1]) ;
144      hasWindows=false ;
145    }
146  }
147*/ 
148  void CClientBuffer::lockBuffer(void)
149  {
150    if (hasWindows)
151    {
152   //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[current]) ;
153      long long int lock=1 ;
154      long long int zero=0, one=1 ;
155     
156      MPI_Win_lock(MPI_LOCK_EXCLUSIVE,clientRank_, 0, windows_[current]) ;
157     
158      while(lock!=0)
159      {
160        MPI_Compare_and_swap(&one, &zero, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)),
161                             windows_[current]) ;
162        MPI_Win_flush(clientRank_, windows_[current]) ;
163      }
164
165//      info(100)<<"Buffer locked "<<&windows_<<"  "<<current<<endl ;
166      winState[current]=true ;
167    }
168  }
169
170  void CClientBuffer::unlockBuffer(void)
171  {
172    if (hasWindows)
173    {
174      long long int lock=1 ;
175      long long int zero=0, one=1 ;
176
177      MPI_Compare_and_swap(&zero, &one, &lock, MPI_LONG_LONG_INT, clientRank_, MPI_Aint_add(getWinAddress(current),2*sizeof(size_t)),
178                             windows_[current]) ;
179      MPI_Win_unlock(clientRank_, windows_[current]) ;
180
181 //     info(100)<<"Buffer unlocked "<<&windows_<<"  "<<current<<endl ;
182      winState[current]=false ;
183    }
184  }
185
186  StdSize CClientBuffer::remain(void)
187  {
188    return bufferSize - count;
189  }
190
191  bool CClientBuffer::isBufferFree(StdSize size)
192  {
193//    bool loop=true ;
194//    while (loop)
195//    {
196//      lockBuffer();
197//      if (*control[current]==0) loop=false ; // attemp to read from server ?
198//      else unlockBuffer() ;
199//    }
200 
201    lockBuffer();
202    if (size > bufferSize)
203      ERROR("bool CClientBuffer::isBufferFree(StdSize size)",
204            << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl);
205
206    if (size > maxEventSize)
207    {
208      maxEventSize = size;
209
210      if (size > estimatedMaxEventSize)
211        error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank
212                 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;
213
214      if (size > maxRequestSize) maxRequestSize = size;
215    }
216
217      count=*bufferCount[current] ;
218      return (size <= remain());
219  }
220
221
222  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
223  {
224    if (size <= remain())
225    {
226      retBuffer->realloc(buffer[current] + count, size);
227      count += size;
228      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
229      *bufferCount[current]=count ;
230/*      info(50)<<"CClientBuffer::getBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current
231              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;
232      if (!winState[current]) info(40)<<"CClientBuffer::getBuffer "<<" Windows Not Locked... "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current
233              <<" size "<<size<<" timeLine "<< timeLine <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<endl ;*/
234      return retBuffer;
235    }
236    else
237    {
238      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
239            << "Not enough space in buffer, this should not have happened...");
240      return NULL;
241    }
242  }
243
244  void CClientBuffer::infoBuffer(void)
245  {
246     
247      char checksum=0 ;
248      for(size_t i=0;i<*bufferCount[current];i++) checksum=checksum+buffer[current][i] ;
249 
250      char checksumFirst=0 ;
251      for(size_t i=5; i<10 && i<*bufferCount[current] ;i++) checksumFirst=checksumFirst+buffer[current][i] ;
252 
253      char checksumLast=0 ;
254      for(size_t i=(*bufferCount[current]<10)?0:*bufferCount[current]-10; i<*bufferCount[current] ; i++) checksumLast=checksumLast+buffer[current][i] ;
255 
256      info(45)<<"CClientBuffer::infoBuffer "<<" clientRank_ "<<clientRank_<<" serverRank "<<serverRank <<" current "<<current<<" WinState "<<winState[current]
257              <<" firstTimeLine "<<*firstTimeLine[current]<<" count "<<*bufferCount[current]<<" checksum "<<(int)checksum<<" "
258              <<(int)buffer[current][0]<<" "<<(int)buffer[current][1]<<" "<<(int)buffer[current][2]<<" "<<(int)buffer[current][3]<<" "<<(int)buffer[current][4]<<" "<<(int)buffer[current][5]<<" "
259              <<(int)buffer[current][6]<<" "<<(int)buffer[current][7]<<" "<<(int)buffer[current][8]<<" "<<(int)buffer[current][9]<<" "<<(int)buffer[current][10]<<" "<<(int)buffer[current][11]<<endl ;
260
261  }
262
263  bool CClientBuffer::checkBuffer(bool send)
264  {
265    MPI_Status status;
266    int flag;
267
268    if (pending)
269    {
270      traceOff();
271      MPI_Test(&request, &flag, &status);
272      traceOn();
273      if (flag == true) pending = false;
274    }
275
276    if (!pending)
277    {
278      if (!send) return false ;
279      if (count > 0)
280      {
281        lockBuffer() ;
282 //       if (*control[current]==0 && bufferCount[current] > 0)
283        if (*bufferCount[current] > 0)
284        {
285          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
286          pending = true;
287//          *control[current]=0 ;
288          *firstTimeLine[current]=0 ;
289          *bufferCount[current]=0 ;
290
291           unlockBuffer() ;
292
293          if (current == 1) current = 0;
294          else current = 1;
295          count = 0;
296        }
297        else unlockBuffer() ;
298      }
299    }
300
301    return pending;
302  }
303
304  bool CClientBuffer::hasPendingRequest(void)
305  {
306   
307    lockBuffer() ;
308    count=*bufferCount[current] ;
309    unlockBuffer() ;
310
311    return (pending || count > 0);
312  }
313
314  bool CClientBuffer::isNotifiedFinalized(void)
315  {
316   
317    bool ret ;
318    lockBuffer() ;
319    ret=*finalize[current] == 1 ? true : false ;
320    unlockBuffer() ;
321
322    return ret;
323  }
324
325}
Note: See TracBrowser for help on using the repository browser.