source: XIOS3/trunk/src/transport/one_sided_server_buffer.cpp

Last change on this file was 2628, checked in by jderouillat, 2 months ago

New timers integration/reporting

  • Property svn:eol-style set to native
  • Property svn:executable set to *
File size: 15.5 KB
Line 
1#include "one_sided_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  extern CLogType logProtocol ;
12  extern CLogType logTimers ;
13
14  COneSidedServerBuffer::COneSidedServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
15                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
16                        : clientRank_(clientRank), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
17  {
18    MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
19    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
20    bufferIn >> controlAddr_;
21    createWindow(commSelf, interCommMerged) ;
22  }
23
24  void COneSidedServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
25  {
26    if (info.isActive(logTimers)) CTimer::get("create Windows").resume() ;
27    MPI_Comm interComm ;
28    xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
29    xios::MPI_Intercomm_merge(interComm, true, &winComm_) ;
30    CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
31    xios::MPI_Comm_free(&interComm) ;
32   
33    maxWindows_=MAX_WINDOWS ;
34    windows_.resize(maxWindows_) ;
35   
36    for(int i=0;i<maxWindows_;++i) 
37    {
38      MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
39      CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
40    }
41    MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
42    CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
43    if (info.isActive(logTimers)) CTimer::get("create Windows").suspend() ;
44    MPI_Barrier(winComm_) ;
45    MPI_Barrier(winComm_) ;
46
47  }
48
49  void COneSidedServerBuffer::receivedRequest(vector<char>& buffer)
50  {
51    size_t timeline ;
52    int nbSenders ;
53    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
54    bufferIn >> timeline ;
55    if (timeline==EVENT_BUFFER_RESIZE)
56    {
57      size_t AssociatedTimeline ;
58      size_t newSize ;
59      bufferIn >>AssociatedTimeline>>newSize ;
60      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
61    }
62    else // receive standard event
63    {
64      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
65                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
66      bufferIn>> nbSenders ;
67      nbSenders_[timeline] = nbSenders ;
68      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
69      if (pendingFullEvent==pendingFullEvents_.end()) 
70      {
71        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
72        pendingFullEvents_[timeline]=pendingEvent ;
73      }
74      else 
75      { 
76        pendingFullEvent->second.currentNbSenders++ ;
77        pendingFullEvent->second.buffers.push_back(this) ;
78      }
79   
80      int nbBlocs ; 
81      int count ;
82      int window ;
83      bufferIn >> nbBlocs ;
84      MPI_Aint bloc ;
85      auto& blocs = pendingBlocs_[timeline] ;
86      for(int i=0;i<nbBlocs;++i) 
87      {
88        bufferIn >> bloc >> count >> window;
89        blocs.push_back({bloc, count, window}) ;
90      }
91    }
92  }
93
94  void COneSidedServerBuffer::eventLoop(void)
95  {
96    int flag ;
97    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
98    if (pendingRmaRequests_.empty()) transferEvents() ;
99
100    if (!isLocked_)
101    {
102      if (lastBlocToFree_!=0)
103      {
104        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
105        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
106        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
107        MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
108        MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
109        MPI_Win_unlock(windowRank_,winControl_) ; 
110        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
111        lastBlocToFree_ = 0 ;       
112      }
113    }
114
115    if (buffers_.size()>1) 
116    {
117      if (buffers_.front()->getCount()==0) {
118        delete buffers_.front();
119        buffers_.pop_front() ; // if buffer is empty free buffer
120      }
121    }
122  }
123
124  void COneSidedServerBuffer::notifyClientFinalize(void)
125  {
126    eventLoop() ; // to free the last bloc
127    MPI_Aint finalize=1 ;
128    MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
129    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
130    MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
131    MPI_Win_unlock(windowRank_,winControl_) ; 
132  }
133  void COneSidedServerBuffer::testPendingRequests(void)
134  {
135    if (!pendingRmaRequests_.empty())
136    {
137      int flag ;   
138
139      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
140      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
141      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
142     
143      if (flag==true) 
144      {
145        if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
146        for(auto& win : windowsLocked_) 
147        {
148          info(logProtocol)<<"unlock window "<<win<<endl ;
149          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
150          MPI_Win_unlock(windowRank_,windows_[win]) ; 
151          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
152        }
153        windowsLocked_.clear() ;
154       
155
156        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
157        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
158       
159        size_t transferedSize = 0 ;
160        for(auto& count : pendingRmaCount_) transferedSize+=count ;
161
162        if (info.isActive(logProtocol))
163        {
164          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
165          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
166                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
167                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
168          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
169         }
170
171        isLocked_=false ;
172        pendingRmaRequests_.clear() ;
173        pendingRmaStatus_.clear() ;
174        pendingRmaCount_.clear() ;
175        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
176       
177        for(auto & event : onTransferEvents_) 
178        {
179          size_t timeline = event.first ;
180
181          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
182          pendingFullEvent->second.nbSenders-- ;
183          pendingFullEvent->second.currentNbSenders-- ;
184         
185
186          auto completedFullEvent=completedFullEvents_.find(timeline) ;
187          if (completedFullEvent==completedFullEvents_.end()) 
188          {
189            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
190            completedFullEvents_[timeline]=pendingEvent ;
191          }
192          else 
193          {
194            completedFullEvent->second.currentNbSenders++ ;
195            completedFullEvent->second.buffers.push_back(this) ;
196          }
197          nbSenders_.erase(timeline) ;
198        } 
199        onTransferEvents_.clear() ;
200      }
201    }
202
203  }
204 
205  size_t COneSidedServerBuffer::remainSize(void)
206  {
207    if (!fixed_) return std::numeric_limits<size_t>::max() ;
208    else
209    {
210      if (currentBuffer_ == nullptr) return fixedSize_ ;
211      else return currentBuffer_->remain() ;
212    }
213  }
214
215  void COneSidedServerBuffer::transferEvents(void)
216  {
217    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
218    {
219      size_t remain=remainSize() ;
220      size_t transferedSize=0 ;
221
222      size_t timeline =  pendingBlocs_.begin()->first ;
223      auto& blocs = pendingBlocs_.begin()->second ;
224     
225      if (!bufferResize_.empty()) 
226      {
227        if (bufferResize_.front().first==timeline)
228        {
229          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
230          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
231          bufferResize_.pop_front() ;
232          newBuffer(currentBufferSize_,fixed_) ;
233        }
234      }
235
236      size_t eventSize=0 ;
237      for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
238     
239      if (eventSize > remain) 
240      {
241        if ( eventSize <= currentBufferSize_) return ; // wait for free storage ;
242        else 
243        {
244          if (currentBuffer_==nullptr) remain = eventSize ;
245          else remain = currentBuffer_->remain() + fixedSize_ ;
246        }
247      }
248     
249      if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
250     
251      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
252      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
253      for(auto& bloc : blocs) 
254      {
255        int win=get<2>(bloc) ;
256        if (windowsLocked_.count(win)==0) 
257        {
258          info(logProtocol)<<"lock window "<<win<<endl ;
259          if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
260          MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
261          if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
262          windowsLocked_.insert(win) ;
263        }
264      }
265      isLocked_=true ;
266      do
267      {
268        transferEvent() ; // ok enough storage for this bloc
269       
270        transferedSize += eventSize ;
271        pendingBlocs_.erase(pendingBlocs_.begin()) ;
272       
273        //  break ; // transfering just one event temporary => to remove
274       
275        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
276
277        timeline =  pendingBlocs_.begin()->first ;
278        auto& blocs=pendingBlocs_.begin()->second ;
279       
280        if (!bufferResize_.empty()) 
281        {
282          if (bufferResize_.front().first==timeline)
283          {
284            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
285            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
286            bufferResize_.pop_front() ;
287            newBuffer(currentBufferSize_,fixed_) ;
288          }
289        }
290
291        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
292        if (transferedSize+eventSize<=remain)
293        {
294          for(auto& bloc : blocs) 
295          {
296            int win=get<2>(bloc) ;
297            if (windowsLocked_.count(win)==0) 
298            {
299              info(logProtocol)<<"lock window "<<win<<endl ;
300              if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
301              MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
302              if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
303              windowsLocked_.insert(win) ;
304            }
305          }
306        }
307      }
308      while(transferedSize+eventSize<=remain) ;
309     
310    }
311  }
312 
313  void COneSidedServerBuffer::transferEvent(void)
314  {
315    MPI_Aint addr;
316    MPI_Aint offset ;
317
318    size_t size;
319    size_t start;
320    size_t count;
321    int window ;
322
323    auto& blocs=pendingBlocs_.begin()->second ;
324    size_t timeline = pendingBlocs_.begin() -> first ;
325 
326
327    for(auto& bloc : blocs)
328    {
329      addr = std::get<0>(bloc) ;
330      size = std::get<1>(bloc) ;
331      window = std::get<2>(bloc) ;
332
333      offset=0 ;
334
335      do
336      {
337        if (currentBuffer_!=nullptr)
338        {
339          currentBuffer_->reserve(size, start, count) ;
340     
341          if ( count > 0)
342          {
343            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
344            offset=MPI_Aint_add(offset, count) ;
345          }
346          currentBuffer_->reserve(size, start, count) ;
347     
348          if ( count > 0)
349          {
350            transferRmaRequest(timeline, addr, offset, currentBuffer_, start, count, window) ;
351            offset=MPI_Aint_add(offset, count) ;
352          }
353        }
354
355        if (size>0) 
356        {
357          if (fixed_) newBuffer(fixedSize_,fixed_) ;
358          else
359          {
360            currentBufferSize_ = std::max((size_t)(currentBufferSize_*growingFactor_), size) ;
361            newBuffer(currentBufferSize_,fixed_) ;
362          }
363        }
364      } while (size > 0 ) ;
365    }
366
367    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
368  }
369
370  void COneSidedServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
371  {
372    MPI_Request request ;
373    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
374    if (info.isActive(logProtocol))
375    {
376      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ;
377      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
378               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
379    }
380    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
381    MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
382    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
383    pendingRmaRequests_.push_back(request) ;
384    pendingRmaCount_.push_back(count) ;
385    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
386  }
387
388  void COneSidedServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
389  {
390    auto &completedEvent=completedEvents_[timeline] ;
391    size_t size=0 ;
392    for(auto& bloc : completedEvent) size+=bloc.count ;
393    char* buffer = new char[size] ;
394    size=0 ;
395   
396    ostringstream outStr ;
397    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
398                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
399    int i=0 ;
400    MPI_Aint addr ;
401    for(auto& bloc : completedEvent) 
402    {
403      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
404     
405      if (info.isActive(logProtocol))
406      {
407        size_t checksum=0 ;
408        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
409        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
410        i++ ;
411      }
412
413      size+=bloc.count ;
414      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
415      addr=bloc.addr ;
416      if (bloc.buffer->getCount()==0)
417      {
418        if (buffers_.size() > 1)
419        {
420          delete buffers_.front();
421          buffers_.pop_front() ; // if buffer is empty free buffer
422        }
423      }     
424    }
425    event.push(clientRank_, nullptr, buffer, size) ;
426    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
427    info(logProtocol)<<outStr.str()<<endl ;
428   
429    lastBlocToFree_=addr ;
430    /*
431    if (!isLocked_) MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, window_) ;
432    MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR) ;
433    MPI_Put(&addr, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, window_) ;
434    if (!isLocked_) MPI_Win_unlock(windowRank_,window_) ;
435   */
436    completedEvents_.erase(timeline) ;
437    eventLoop() ;
438  }
439
440 
441
442}
Note: See TracBrowser for help on using the repository browser.