source: XIOS/dev/dev_ym/XIOS_ONE_SIDED/src/buffer_client.cpp @ 1547

Last change on this file since 1547 was 1547, checked in by ymipsl, 6 years ago

New communication protocol between clients and servers, using hybrid mode of p2p mixt with one_sided communication in order to avoid dead-locking. The constraint of the maximum number of event that can be bufferized on client side is released.

Dev branch is created to be tested before merging.

YM

  • Property copyright set to
    Software name : XIOS (Xml I/O Server)
    http://forge.ipsl.jussieu.fr/ioserver
    Creation date : January 2009
    Licence : CeCCIL version2
    see license file in root directory : Licence_CeCILL_V2-en.txt
    or http://www.cecill.info/licences/Licence_CeCILL_V2-en.html
    Holder : CEA/LSCE (Laboratoire des Sciences du CLimat et de l'Environnement)
    CNRS/IPSL (Institut Pierre Simon Laplace)
    Project Manager : Yann Meurdesoif
    yann.meurdesoif@cea.fr
  • Property svn:eol-style set to native
File size: 5.6 KB
Line 
1#include "xios_spl.hpp"
2#include "exception.hpp"
3#include "log.hpp"
4#include "buffer_out.hpp"
5#include "buffer_client.hpp"
6#include "cxios.hpp"
7#include "mpi.hpp"
8#include "tracer.hpp"
9
10namespace xios
11{
12  size_t CClientBuffer::maxRequestSize = 0;
13
14  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize)
15    : interComm(interComm)
16    , serverRank(serverRank)
17    , bufferSize(bufferSize)
18    , estimatedMaxEventSize(estimatedMaxEventSize)
19    , maxEventSize(0)
20    , current(0)
21    , count(0)
22    , pending(false)
23    , hasWindows(false) 
24  {
25      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[0]) ;
26      MPI_Alloc_mem(bufferSize+headerSize, MPI_INFO_NULL, &bufferHeader[1]) ;
27
28
29    buffer[0] = bufferHeader[0]+headerSize ;
30    buffer[1] = bufferHeader[1]+headerSize ;
31    firstTimeLine[0]=(size_t*)bufferHeader[0] ;
32    firstTimeLine[1]=(size_t*)bufferHeader[1] ;
33    bufferCount[0]=(size_t*)bufferHeader[0] +1 ;
34    bufferCount[1]=(size_t*)bufferHeader[1] +1 ;
35    control[0]=(size_t*)bufferHeader[0] +2 ;
36    control[1]=(size_t*)bufferHeader[1] +2 ;
37
38    *firstTimeLine[0]=0 ;
39    *firstTimeLine[1]=0 ;
40    *bufferCount[0]=0 ;
41    *bufferCount[1]=0 ;
42    *control[0]=0 ;
43    *control[1]=0 ;
44    winState[0]=false ;
45    winState[1]=false ;
46    retBuffer = new CBufferOut(buffer[current], bufferSize);
47    info(10) << "CClientBuffer: allocated 2 x " << bufferSize << " bytes for server " << serverRank << endl;
48  }
49
50  CClientBuffer::~CClientBuffer()
51  {
52     freeWindows() ;
53     MPI_Free_mem(bufferHeader[0]) ;
54     MPI_Free_mem(bufferHeader[1]) ;
55     delete retBuffer;
56  }
57
58  void CClientBuffer::createWindows(MPI_Comm oneSidedComm)
59  {
60    MPI_Barrier(oneSidedComm) ;
61    MPI_Win_create(bufferHeader[0], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[0])) ;
62    MPI_Win_create(bufferHeader[1], bufferSize+headerSize, 1, MPI_INFO_NULL, oneSidedComm, &(windows[1])) ;
63
64    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ;
65    *firstTimeLine[0]=0 ;
66    *bufferCount[0]=0 ;
67    *control[0]=0 ;
68    MPI_Win_unlock(0, windows[0]) ;
69
70    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ;
71    *firstTimeLine[1]=0 ;
72    *bufferCount[1]=0 ;
73    *control[1]=0 ;
74    MPI_Win_unlock(0, windows[1]) ;
75    winState[0]=false ;
76    winState[1]=false ;
77    MPI_Barrier(oneSidedComm) ;
78    hasWindows=true ;
79  }
80
81  void CClientBuffer::freeWindows()
82  {
83    if (hasWindows)
84    {
85      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[0]) ;
86      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[1]) ;
87      *control[0]=2 ;
88      *control[1]=2 ;
89      MPI_Win_unlock(0, windows[1]) ;
90      MPI_Win_unlock(0, windows[0]) ;
91     
92      MPI_Win_free(&windows[0]) ;
93      MPI_Win_free(&windows[1]) ;
94      hasWindows=false ;
95    }
96  }
97 
98  void CClientBuffer::lockBuffer(void)
99  {
100    if (hasWindows)
101    {
102      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, windows[current]) ;
103      winState[current]=true ;
104    }
105  }
106
107  void CClientBuffer::unlockBuffer(void)
108  {
109    if (hasWindows)
110    {
111      MPI_Win_unlock(0, windows[current]) ;
112      winState[current]=false ;
113    }
114  }
115
116  StdSize CClientBuffer::remain(void)
117  {
118    return bufferSize - count;
119  }
120
121  bool CClientBuffer::isBufferFree(StdSize size)
122  {
123    bool loop=true ;
124    while (loop) 
125    {
126      lockBuffer();
127      if (*control[current]==0) loop=false ; // attemp to read from server ?
128      else unlockBuffer() ;
129    }
130   
131    if (size > bufferSize)
132      ERROR("bool CClientBuffer::isBufferFree(StdSize size)",
133            << "The requested size (" << size << " bytes) is too big to fit the buffer (" << bufferSize << " bytes), please increase the client buffer size." << endl);
134
135    if (size > maxEventSize)
136    {
137      maxEventSize = size;
138
139      if (size > estimatedMaxEventSize)
140        error(0) << "WARNING: Unexpected event of size " << size << " for server " << serverRank
141                 << " (estimated max event size = " << estimatedMaxEventSize << ")" << std::endl;
142
143      if (size > maxRequestSize) maxRequestSize = size;
144    }
145
146      count=*bufferCount[current] ;
147      return (size <= remain());
148  }
149
150
151  CBufferOut* CClientBuffer::getBuffer(size_t timeLine, StdSize size)
152  {
153    if (size <= remain())
154    {
155      info(100)<<"count "<<count<<"   bufferCount[current]  "<<*bufferCount[current]<<endl ;
156      retBuffer->realloc(buffer[current] + count, size);
157      count += size;
158      if (*firstTimeLine[current]==0) *firstTimeLine[current]=timeLine ;
159      *bufferCount[current]=count ;
160      return retBuffer;
161    }
162    else
163    {
164      ERROR("CBufferOut* CClientBuffer::getBuffer(StdSize size)",
165            << "Not enough space in buffer, this should not have happened...");
166      return NULL;
167    }
168  }
169
170  bool CClientBuffer::checkBuffer(bool send)
171  {
172    MPI_Status status;
173    int flag;
174
175    if (pending)
176    {
177      traceOff();
178      MPI_Test(&request, &flag, &status);
179      traceOn();
180      if (flag == true) pending = false;
181    }
182
183    if (!pending)
184    {
185      if (!send) return false ;
186      if (count > 0)
187      {
188        lockBuffer() ;
189        if (*control[current]==0 && bufferCount[current] > 0)
190        {
191          MPI_Issend(buffer[current], count, MPI_CHAR, serverRank, 20, interComm, &request);
192          pending = true;
193          *control[current]=0 ;
194          *firstTimeLine[current]=0 ;
195          *bufferCount[current]=0 ;
196
197           unlockBuffer() ;
198
199          if (current == 1) current = 0;
200          else current = 1;
201          count = 0;
202        }
203        else unlockBuffer() ;
204      }
205    }
206
207    return pending;
208  }
209
210  bool CClientBuffer::hasPendingRequest(void)
211  {
212   
213    lockBuffer() ;
214    count=*bufferCount[current] ;
215    unlockBuffer() ;
216
217    return (pending || count > 0);
218  }
219
220
221}
Note: See TracBrowser for help on using the repository browser.