source: XIOS3/trunk/src/transport/one_sided_server_buffer.hpp

Last change on this file was 2565, checked in by jderouillat, 10 months ago

Clean buffers of servers in One Sided protocol

  • Property svn:executable set to *
File size: 5.3 KB
Line 
1#ifndef __ONE_SIDED_SERVER_BUFFER_HPP__
2#define __ONE_SIDED_SERVER_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_server.hpp"
9#include "one_sided_cs_buffer_base.hpp"
10#include "one_sided_server_base.hpp"
11
12namespace xios
13{
14  extern CLogType logProtocol ;
15
16  class COneSidedServerBuffer : public COneSidedCSBufferBase, public COneSidedServerBase
17  {
18    private :
19
20      class CBuffer
21      {
22        char* buffer_ ;
23        size_t start_ ;
24        size_t end_ ;
25        size_t count_ ;
26        size_t size_ ;
27        bool fixed_ ;
28
29        public:
30          CBuffer(size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed) 
31          { 
32            MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer_) ;
33            info(logProtocol)<<"New buffer of size="<<size<<endl ;
34          }
35          ~CBuffer() 
36          { 
37            if (count_>0) ERROR("COneSidedServerBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
38            MPI_Free_mem(buffer_) ;
39          }
40       
41          void reserve(size_t& size, size_t& start, size_t& count)
42          {
43            count = 0 ;
44            if (end_ < start_)
45            {
46              if (start_-end_ >= size)
47              {
48                count=size ;
49                size = 0 ;
50                start=end_ ;
51                end_  += count ; 
52                count_+= count ;
53              } 
54              else
55              {
56                count = start_-end_ ;
57                size -= count ;
58                start=end_ ;
59                end_ = start_ ;
60                count_+=count ;
61              }
62            }
63            else if ( end_> start_ )
64            {
65              if (size_-end_ >= size)
66              {
67                count = size ;
68                size = 0;
69                start=end_ ;
70                end_   += count ;
71                count_ += count ;
72              }
73              else
74              {
75                count = size_ - end_ ;
76                size -= count ;
77                start=end_ ;
78                end_ = 0 ;
79                count_+= count ;
80              }
81            }
82            else if (end_==start_)
83            {
84              count = 0 ;
85            }
86          }
87
88          void free(size_t start, size_t count)
89          {
90            start_ = start+count-1 ;
91            count_ -= count ;
92          }
93
94          size_t remain(void) { return size_-count_; }
95          size_t getSize(void) { return size_ ;}
96          size_t getCount(void) {return count_ ;}
97          size_t isFixed(void) {return fixed_;}
98          char* getBuffer(void) {return buffer_ ;}
99      } ;
100     
101    public:
102   
103      COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
104                             map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer)  ;
105     
106      ~COneSidedServerBuffer()
107      {
108          while (!buffers_.empty()) {
109              delete buffers_.front();
110              buffers_.pop_front() ; // if buffer is empty free buffer
111          }
112      };
113
114      void receivedRequest(vector<char>& buffer) ;
115      void eventLoop(void) ;
116      void fillEventServer(size_t timeline, CEventServer& event) ;
117      void notifyClientFinalize(void);
118
119    private:
120      struct SBloc
121      {
122        CBuffer* buffer ;
123        size_t start ;
124        int count ;
125        MPI_Aint addr ;
126      } ;
127
128      void createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) ;
129      void newBuffer(size_t size, bool fixed) { buffers_.push_back(new CBuffer(size, fixed)); currentBuffer_=buffers_.back() ;}
130      void testPendingRequests(void) ;
131      void transferEvents(void) ;
132      void transferEvent(void) ;
133      void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ;
134      size_t remainSize(void) ;
135
136
137
138      bool fixed_=false;
139      size_t fixedSize_ = 0 ;
140      size_t currentBufferSize_=0 ;
141      double growingFactor_ = 1.2 ;
142      double bufferServerFactor_=1. ;
143     
144      std::list<CBuffer*> buffers_ ;
145      CBuffer* currentBuffer_=nullptr ;
146
147      map<size_t, SPendingEvent>& pendingFullEvents_ ;
148      map<size_t, SPendingEvent>& completedFullEvents_ ;
149
150      map<size_t, int> nbSenders_ ;
151      map<size_t, list<tuple<MPI_Aint,int,int>>> pendingBlocs_;
152     
153      vector<MPI_Request> pendingRmaRequests_ ;
154      vector<MPI_Status> pendingRmaStatus_ ;
155      vector<int> pendingRmaCount_ ;
156
157      map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
158      map<size_t, list<SBloc>> completedEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
159      list<std::pair<size_t,size_t>> bufferResize_ ; // list<size_t AssociatedTimeline, size_t newSize>
160
161      int clientRank_ ;
162      MPI_Aint * control_ ;
163      MPI_Aint controlAddr_ ;
164
165      MPI_Comm winComm_ ;
166      vector<MPI_Win> windows_ ;
167      int maxWindows_ ;
168      set<int> windowsLocked_ ;
169
170      MPI_Win winControl_ ;
171      bool isLocked_=false ;
172      const int windowRank_=0 ;
173      MPI_Aint lastBlocToFree_=0 ;
174
175  } ;
176
177}
178
179
180#endif
Note: See TracBrowser for help on using the repository browser.