source: XIOS3/trunk/src/transport/one_sided_client_buffer.hpp @ 2526

Last change on this file since 2526 was 2526, checked in by jderouillat, 12 months ago

Backport intracommunicator probing and a nonblocking communication fix for the one_sided protocol (2524-2525) in trunk

  • Property svn:executable set to *
File size: 6.5 KB
Line 
1#ifndef __ONE_SIDED_CLIENT_BUFFER_HPP__
2#define __ONE_SIDED_CLIENT_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_client.hpp"
9#include "one_sided_cs_buffer_base.hpp"
10
11namespace xios
12{
13  extern CLogType logProtocol ;
14
15  class COneSidedClientBuffer : public COneSidedCSBufferBase
16  {
17    class CBuffer
18    {
19      char* buffer_ ;
20      size_t start_ ;
21      size_t end_ ;
22      size_t count_ ;
23      size_t size_ ;
24      bool fixed_ ;
25      MPI_Win window_ ;
26
27      public:
28        CBuffer(MPI_Win window, size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed), window_(window) 
29        { 
30          size_t trueSize = (size/8 + 1)*8 + 8 ;  // seems to have some problem with OpenMPi/UCX when trying to tranfer 1 byte
31                                                  // at the end of the buffer. Alignment problem ? Or simple bug ? Seems that allocate
32                                                  // greater buffer solves the problem
33          MPI_Alloc_mem(trueSize, MPI_INFO_NULL, &buffer_) ;
34          info(logProtocol)<<"Attach memory to windows : addr="<<(MPI_Aint)buffer_<<"   count="<<size<<endl ;
35
36          MPI_Win_attach(window_, buffer_, trueSize) ;
37         
38        }
39        ~CBuffer() 
40        { 
41          if (count_>0) ERROR("COneSidedClientBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
42          MPI_Win_detach(window_, buffer_) ;
43          MPI_Free_mem(buffer_) ;
44          info(logProtocol)<<"Detach memory from windows : addr="<<(MPI_Aint)buffer_<<"   count="<<size_<<endl ;
45        }
46       
47        void write(char** buffer, size_t& size, MPI_Aint& addr, size_t& start, size_t& count)
48        {
49          addr = 0 ;
50          count = 0 ;
51          if (end_ < start_)
52          {
53            if (start_-end_ >= size)
54            {
55              count=size ;
56              size = 0 ;
57              start=end_ ;
58              MPI_Get_address(&buffer_[start], &addr) ;
59              end_  += count ; 
60              count_+= count ;
61              memcpy(&buffer_[start], *buffer, count) ;
62              *buffer+=count ;
63            } 
64            else
65            {
66              count = start_-end_ ;
67              size -= count ;
68              start=end_ ;
69              MPI_Get_address(&buffer_[start], &addr) ;
70              end_ = start_ ;
71              count_+=count ;
72              memcpy(&buffer_[start], *buffer, count) ;
73              *buffer+=count ;
74            }
75          }
76          else if ( end_> start_ )
77          {
78            if (size_-end_ >= size)
79            {
80              count = size ;
81              size = 0;
82              start=end_ ;
83              MPI_Get_address(&buffer_[start], &addr) ;
84              end_   += count ;
85              count_ += count ;
86              memcpy(&buffer_[start], *buffer, count) ;
87              *buffer+=count ;
88            }
89            else
90            {
91              count = size_ - end_ ;
92              size -= count ;
93              start=end_ ;
94              MPI_Get_address(&buffer_[start], &addr) ;
95              end_ = 0 ;
96              count_+= count ;
97              memcpy(&buffer_[start], *buffer, count) ;
98              *buffer+=count ;
99            }
100          }
101          else if (end_==start_)
102          {
103            count = 0 ;
104          }
105
106          // check
107
108          if (count!=0)
109          {
110            MPI_Aint startBufferAddr,endBufferAddr ;
111            MPI_Get_address(&buffer_[0], &startBufferAddr) ;
112            MPI_Get_address(&buffer_[size_-1], &endBufferAddr) ;
113
114            if (addr<startBufferAddr || MPI_Aint_add(addr,count-1)>endBufferAddr)
115            {
116              ERROR("COneSidedClientBuffer::CBuffer::write(char** buffer, size_t& size, MPI_Aint& addr, size_t& start, size_t& count)",<<" out of bounds"<<std::endl) ;
117            }
118
119          }
120        }
121
122        void free(size_t start, size_t count)
123        {
124          start_ = start+count-1 ;
125          count_ -= count ;
126        }
127
128        size_t remain(void) { return size_-count_; }
129        size_t getSize(void) { return size_ ;}
130        size_t getCount(void) {return count_ ;}
131        size_t isFixed(void) {return fixed_;}
132    } ;
133 
134    public:
135      COneSidedClientBuffer(MPI_Comm& interComm, int serverRank, MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank) ;
136      void newBuffer(size_t size, bool fixed) ;
137      bool isBufferFree(size_t size) ;
138      int writeBuffer(char* buff, size_t size) ;
139      void freeBuffer(MPI_Aint addr) ;
140      bool freeBloc(MPI_Aint addr) ;
141      bool writeEvent(size_t timeLine, CEventClient& event) ;
142      bool isEmpty(void) { return blocs_.empty() ;}
143      bool isNotifiedFinalized(void) { eventLoop() ; return buffers_.empty() && isFinalized_ ;}
144      void setFixed(size_t size) { fixed_=true ; fixedSize_=size ;}
145      void setGrowable(double growingFactor) { fixed_= false ; growingFactor_=growingFactor;}
146      void setGrowingFactor(double growingFactor) {growingFactor_=growingFactor;}
147      void eventLoop(void) ;
148      void sendTimelineEvent(size_t timeline, int nbSenders, int nbBlocs) ;
149      void sendResizeBufferEvent(size_t timeline, size_t currentBufferSize_) ;
150      void sendNewBuffer(void) ;
151      void createWindow(MPI_Comm& commSelf, MPI_Comm& interCommMerged, int intraServerRank ) ;
152
153    private :
154     
155      struct SBloc
156      {
157        MPI_Aint addr ;
158        CBuffer* buffer ;
159        size_t start ;
160        int    count ;
161        int    window ;
162      } ;
163
164      struct SRequest
165      {
166        CBufferOut* buffer ;
167        MPI_Request mpiRequest ;
168      } ;
169     
170      MPI_Aint* control_ ;
171
172      MPI_Comm interComm_; 
173      MPI_Comm winComm_ ;
174
175      MPI_Win window_ ;
176      vector<MPI_Win> windows_ ;
177      vector<bool> usedWindows_ ;
178      int currentWindow_ ;
179      int maxWindows_ ;
180
181      MPI_Win winControl_ ;
182      int serverRank_ ;
183
184      MPI_Comm interCommMerged_; 
185      int intraServerRank_ ;
186
187      std::list<CBuffer*> buffers_ ;
188      std::list<SBloc> blocs_ ;
189      std::list<SBloc>::iterator lastBlocEvent_ ;
190      CBuffer* currentBuffer_=nullptr ;
191      std::list<SRequest> requests_ ;
192
193      bool fixed_=false;
194      size_t fixedSize_ = 0 ;
195      size_t currentBufferSize_= 0  ;
196      double growingFactor_ = 2. ; 
197      MPI_Aint lastFreedBloc_=0 ;
198      bool isFinalized_ = false ;
199
200  } ;
201
202}
203
204
205#endif
Note: See TracBrowser for help on using the repository browser.