source: XIOS3/trunk/src/transport/one_sided_server_buffer.hpp @ 2433

Last change on this file since 2433 was 2399, checked in by ymipsl, 22 months ago

-Fix performance issue in one_sided protocol

  • better timer instrumentation of the protocol

YM

  • Property svn:executable set to *
File size: 5.1 KB
Line 
1#ifndef __ONE_SIDED_SERVER_BUFFER_HPP__
2#define __ONE_SIDED_SERVER_BUFFER_HPP__
3
4#include "xios_spl.hpp"
5#include "buffer_out.hpp"
6#include "mpi.hpp"
7#include "cxios.hpp"
8#include "event_server.hpp"
9#include "one_sided_cs_buffer_base.hpp"
10#include "one_sided_server_base.hpp"
11
12namespace xios
13{
14  extern CLogType logProtocol ;
15
16  class COneSidedServerBuffer : public COneSidedCSBufferBase, public COneSidedServerBase
17  {
18    private :
19
20      class CBuffer
21      {
22        char* buffer_ ;
23        size_t start_ ;
24        size_t end_ ;
25        size_t count_ ;
26        size_t size_ ;
27        bool fixed_ ;
28
29        public:
30          CBuffer(size_t size, bool fixed) : start_(size), end_(0), count_(0), size_(size), fixed_(fixed) 
31          { 
32            MPI_Alloc_mem(size, MPI_INFO_NULL, &buffer_) ;
33            info(logProtocol)<<"New buffer of size="<<size<<endl ;
34          }
35          ~CBuffer() 
36          { 
37            if (count_>0) ERROR("COneSidedServerBuffer::~CBuffer()",<<"Try to delete buffer that is not empty"<<std::endl) ;
38            MPI_Free_mem(&buffer_) ;
39          }
40       
41          void reserve(size_t& size, size_t& start, size_t& count)
42          {
43            count = 0 ;
44            if (end_ < start_)
45            {
46              if (start_-end_ >= size)
47              {
48                count=size ;
49                size = 0 ;
50                start=end_ ;
51                end_  += count ; 
52                count_+= count ;
53              } 
54              else
55              {
56                count = start_-end_ ;
57                size -= count ;
58                start=end_ ;
59                end_ = start_ ;
60                count_+=count ;
61              }
62            }
63            else if ( end_> start_ )
64            {
65              if (size_-end_ >= size)
66              {
67                count = size ;
68                size = 0;
69                start=end_ ;
70                end_   += count ;
71                count_ += count ;
72              }
73              else
74              {
75                count = size_ - end_ ;
76                size -= count ;
77                start=end_ ;
78                end_ = 0 ;
79                count_+= count ;
80              }
81            }
82            else if (end_==start_)
83            {
84              count = 0 ;
85            }
86          }
87
88          void free(size_t start, size_t count)
89          {
90            start_ = start+count-1 ;
91            count_ -= count ;
92          }
93
94          size_t remain(void) { return size_-count_; }
95          size_t getSize(void) { return size_ ;}
96          size_t getCount(void) {return count_ ;}
97          size_t isFixed(void) {return fixed_;}
98          char* getBuffer(void) {return buffer_ ;}
99      } ;
100     
101    public:
102   
103      COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
104                             map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer)  ;
105     
106      void receivedRequest(vector<char>& buffer) ;
107      void eventLoop(void) ;
108      void fillEventServer(size_t timeline, CEventServer& event) ;
109      void notifyClientFinalize(void);
110
111    private:
112      struct SBloc
113      {
114        CBuffer* buffer ;
115        size_t start ;
116        int count ;
117        MPI_Aint addr ;
118      } ;
119
120      void createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged) ;
121      void newBuffer(size_t size, bool fixed) { buffers_.push_back(new CBuffer(size, fixed)); currentBuffer_=buffers_.back() ;}
122      void testPendingRequests(void) ;
123      void transferEvents(void) ;
124      void transferEvent(void) ;
125      void transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window) ;
126      size_t remainSize(void) ;
127
128
129
130      bool fixed_=false;
131      size_t fixedSize_ = 0 ;
132      size_t currentBufferSize_=0 ;
133      double growingFactor_ = 2. ;
134      double bufferServerFactor_=10. ;
135     
136      std::list<CBuffer*> buffers_ ;
137      CBuffer* currentBuffer_=nullptr ;
138
139      map<size_t, SPendingEvent>& pendingFullEvents_ ;
140      map<size_t, SPendingEvent>& completedFullEvents_ ;
141
142      map<size_t, int> nbSenders_ ;
143      map<size_t, list<tuple<MPI_Aint,int,int>>> pendingBlocs_;
144     
145      vector<MPI_Request> pendingRmaRequests_ ;
146      vector<MPI_Status> pendingRmaStatus_ ;
147      vector<int> pendingRmaCount_ ;
148
149      map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
150      map<size_t, list<SBloc>> completedEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>>
151      list<std::pair<size_t,size_t>> bufferResize_ ; // list<size_t AssociatedTimeline, size_t newSize>
152
153      int clientRank_ ;
154      MPI_Aint * control_ ;
155      MPI_Aint controlAddr_ ;
156
157      MPI_Comm winComm_ ;
158      vector<MPI_Win> windows_ ;
159      int maxWindows_ ;
160      set<int> windowsLocked_ ;
161
162      MPI_Win winControl_ ;
163      bool isLocked_=false ;
164      const int windowRank_=0 ;
165      MPI_Aint lastBlocToFree_=0 ;
166
167  } ;
168
169}
170
171
172#endif
Note: See TracBrowser for help on using the repository browser.