source: XIOS3/trunk/src/transport/one_sided_server_buffer.cpp @ 2589

Last change on this file since 2589 was 2589, checked in by jderouillat, 8 months ago

Specify the usage of the xios namespace to overload the MPI funtions

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.4 KB
Line 
1#include "one_sided_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  CLogType logProtocol("log_protocol") ;
12
13  COneSidedServerBuffer::COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21  }
22
23  void COneSidedServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
24  {
25    CTimer::get("create Windows").resume() ;
26    MPI_Comm interComm ;
27    xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
28    xios::MPI_Intercomm_merge(interComm, true, &winComm_) ;
29    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
30    xios::MPI_Comm_free(&interComm) ;
31   
32    maxWindows_=MAX_WINDOWS ;
33    windows_.resize(maxWindows_) ;
34   
35    for(int i=0;i<maxWindows_;++i) 
36    {
37      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
38      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
39    }
40    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
41    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
42    CTimer::get("create Windows").suspend() ;
43    MPI_Barrier(winComm_) ;
44    MPI_Barrier(winComm_) ;
45
46  }
47
48  void COneSidedServerBuffer::receivedRequest(vector<char>& buffer)
49  {
50    size_t timeline ;
51    int nbSenders ;
52    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
53    bufferIn >> timeline ;
54    if (timeline==EVENT_BUFFER_RESIZE)
55    {
56      size_t AssociatedTimeline ;
57      size_t newSize ;
58      bufferIn >>AssociatedTimeline>>newSize ;
59      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
60    }
61    else // receive standard event
62    {
63      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
64                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
65      bufferIn>> nbSenders ;
66      nbSenders_[timeline] = nbSenders ;
67      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
68      if (pendingFullEvent==pendingFullEvents_.end()) 
69      {
70        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
71        pendingFullEvents_[timeline]=pendingEvent ;
72      }
73      else 
74      { 
75        pendingFullEvent->second.currentNbSenders++ ;
76        pendingFullEvent->second.buffers.push_back(this) ;
77      }
78   
79      int nbBlocs ; 
80      int count ;
81      int window ;
82      bufferIn >> nbBlocs ;
83      MPI_Aint bloc ;
84      auto& blocs = pendingBlocs_[timeline] ;
85      for(int i=0;i<nbBlocs;++i) 
86      {
87        bufferIn >> bloc >> count >> window;
88        blocs.push_back({bloc, count, window}) ;
89      }
90    }
91  }
92
93  void COneSidedServerBuffer::eventLoop(void)
94  {
95    int flag ;
96    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
97    if (pendingRmaRequests_.empty()) transferEvents() ;
98
99    if (!isLocked_)
100    {
101      if (lastBlocToFree_!=0)
102      {
103        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
104        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
105        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
106        MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
107        MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
108        MPI_Win_unlock(windowRank_,winControl_) ; 
109        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
110        lastBlocToFree_ = 0 ;       
111      }
112    }
113
114    if (buffers_.size()>1) 
115    {
116      if (buffers_.front()->getCount()==0) {
117        delete buffers_.front();
118        buffers_.pop_front() ; // if buffer is empty free buffer
119      }
120    }
121  }
122
123  void COneSidedServerBuffer::notifyClientFinalize(void)
124  {
125    eventLoop() ; // to free the last bloc
126    MPI_Aint finalize=1 ;
127    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
128    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
129    MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
130    MPI_Win_unlock(windowRank_,winControl_) ; 
131  }
132  void COneSidedServerBuffer::testPendingRequests(void)
133  {
134    if (!pendingRmaRequests_.empty())
135    {
136      int flag ;   
137
138      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
139      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
140      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
141     
142      if (flag==true) 
143      {
144        if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
145        for(auto& win : windowsLocked_) 
146        {
147          info(logProtocol)<<"unlock window "<<win<<endl ;
148          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
149          MPI_Win_unlock(windowRank_,windows_[win]) ; 
150          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
151        }
152        windowsLocked_.clear() ;
153       
154
155        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
156        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
157       
158        size_t transferedSize = 0 ;
159        for(auto& count : pendingRmaCount_) transferedSize+=count ;
160
161        if (info.isActive(logProtocol))
162        {
163          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
164          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
165                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
166                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
167          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
168         }
169
170        isLocked_=false ;
171        pendingRmaRequests_.clear() ;
172        pendingRmaStatus_.clear() ;
173        pendingRmaCount_.clear() ;
174        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
175       
176        for(auto & event : onTransferEvents_) 
177        {
178          size_t timeline = event.first ;
179
180          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
181          pendingFullEvent->second.nbSenders-- ;
182          pendingFullEvent->second.currentNbSenders-- ;
183         
184
185          auto completedFullEvent=completedFullEvents_.find(timeline) ;
186          if (completedFullEvent==completedFullEvents_.end()) 
187          {
188            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
189            completedFullEvents_[timeline]=pendingEvent ;
190          }
191          else 
192          {
193            completedFullEvent->second.currentNbSenders++ ;
194            completedFullEvent->second.buffers.push_back(this) ;
195          }
196          nbSenders_.erase(timeline) ;
197        } 
198        onTransferEvents_.clear() ;
199      }
200    }
201
202  }
203 
204  size_t COneSidedServerBuffer::remainSize(void)
205  {
206    if (!fixed_) return std::numeric_limits<size_t>::max() ;
207    else
208    {
209      if (currentBuffer_ == nullptr) return fixedSize_ ;
210      else return currentBuffer_->remain() ;
211    }
212  }
213
214  void COneSidedServerBuffer::transferEvents(void)
215  {
216    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
217    {
218      size_t remain=remainSize() ;
219      size_t transferedSize=0 ;
220
221      size_t timeline =  pendingBlocs_.begin()->first ;
222      auto& blocs = pendingBlocs_.begin()->second ;
223     
224      if (!bufferResize_.empty()) 
225      {
226        if (bufferResize_.front().first==timeline)
227        {
228          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
229          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
230          bufferResize_.pop_front() ;
231          newBuffer(currentBufferSize_,fixed_) ;
232        }
233      }
234
235      size_t eventSize=0 ;
236      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
237     
238      if (eventSize > remain) 
239      {
240        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
241        else 
242        {
243          if (currentBuffer_==nullptr) remain = eventSize ;
244          else remain = currentBuffer_->remain() + fixedSize_ ;
245        }
246      }
247     
248      if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
249     
250      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
251      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
252      for(auto& bloc : blocs) 
253      {
254        int win=get<2>(bloc) ;
255        if (windowsLocked_.count(win)==0) 
256        {
257          info(logProtocol)<<"lock window "<<win<<endl ;
258          if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
259          MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
260          if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
261          windowsLocked_.insert(win) ;
262        }
263      }
264      isLocked_=true ;
265      do
266      {
267        transferEvent() ; // ok enough storage for this bloc
268       
269        transferedSize += eventSize ;
270        pendingBlocs_.erase(pendingBlocs_.begin()) ;
271       
272        //  break ; // transfering just one event temporary => to remove
273       
274        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
275
276        timeline =  pendingBlocs_.begin()->first ;
277        auto& blocs=pendingBlocs_.begin()->second ;
278       
279        if (!bufferResize_.empty()) 
280        {
281          if (bufferResize_.front().first==timeline)
282          {
283            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
284            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
285            bufferResize_.pop_front() ;
286            newBuffer(currentBufferSize_,fixed_) ;
287          }
288        }
289
290        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
291        if (transferedSize+eventSize<=remain)
292        {
293          for(auto& bloc : blocs) 
294          {
295            int win=get<2>(bloc) ;
296            if (windowsLocked_.count(win)==0) 
297            {
298              info(logProtocol)<<"lock window "<<win<<endl ;
299              if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
300              MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
301              if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
302              windowsLocked_.insert(win) ;
303            }
304          }
305        }
306      }
307      while(transferedSize+eventSize<=remain) ;
308     
309    }
310  }
311 
312  void COneSidedServerBuffer::transferEvent(void)
313  {
314    MPI_Aint addr;
315    MPI_Aint offset ;
316
317    size_t size;
318    size_t start;
319    size_t count;
320    int window ;
321
322    auto& blocs=pendingBlocs_.begin()->second ;
323    size_t timeline = pendingBlocs_.begin() -> first ;
324 
325
326    for(auto& bloc : blocs)
327    {
328      addr = std::get<0>(bloc) ;
329      size = std::get<1>(bloc) ;
330      window = std::get<2>(bloc) ;
331
332      offset=0 ;
333
334      do
335      {
336        if (currentBuffer_!=nullptr)
337        {
338          currentBuffer_->reserve(size, start, count) ;
339     
340          if ( count > 0)
341          {
342            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
343            offset=MPI_Aint_add(offset, count) ;
344          }
345          currentBuffer_->reserve(size, start, count) ;
346     
347          if ( count > 0)
348          {
349            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
350            offset=MPI_Aint_add(offset, count) ;
351          }
352        }
353
354        if (size>0) 
355        {
356          if (fixed_) newBuffer(fixedSize_,fixed_) ;
357          else
358          {
359            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
360            newBuffer(currentBufferSize_,fixed_) ;
361          }
362        }
363      } while (size > 0 ) ;
364    }
365
366    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
367  }
368
369  void COneSidedServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
370  {
371    MPI_Request request ;
372    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
373    if (info.isActive(logProtocol))
374    {
375      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
376      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
377               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
378    }
379    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
380    MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
381    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
382    pendingRmaRequests_.push_back(request) ;
383    pendingRmaCount_.push_back(count) ;
384    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
385  }
386
387  void COneSidedServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
388  {
389    auto &completedEvent=completedEvents_[timeline] ;
390    size_t size=0 ;
391    for(auto& bloc : completedEvent) size+=bloc.count ;
392    char* buffer = new char[size] ;
393    size=0 ;
394   
395    ostringstream outStr ;
396    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
397                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
398    int i=0 ;
399    MPI_Aint addr ;
400    for(auto& bloc : completedEvent) 
401    {
402      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
403     
404      if (info.isActive(logProtocol))
405      {
406        size_t checksum=0 ;
407        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
408        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
409        i++ ;
410      }
411
412      size+=bloc.count ;
413      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
414      addr=bloc.addr ;
415      if (bloc.buffer->getCount()==0)
416      {
417        if (buffers_.size() > 1)
418        {
419          delete buffers_.front();
420          buffers_.pop_front() ; // if buffer is empty free buffer
421        }
422      }     
423    }
424    event.push(clientRank_, nullptr, buffer, size) ;
425    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
426    info(logProtocol)<<outStr.str()<<endl ;
427   
428    lastBlocToFree_=addr ;
429    /*
430    if (!isLocked_) MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, window_) ;
431    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR) ;
432    MPI_Put(&addr, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, window_) ;
433    if (!isLocked_) MPI_Win_unlock(windowRank_,window_) ;
434   */
435    completedEvents_.erase(timeline) ;
436    eventLoop() ;
437  }
438
439 
440
441}
Note: See TracBrowser for help on using the repository browser.