source: XIOS3/trunk/src/transport/p2p_server_buffer.cpp

Last change on this file was 2628, checked in by jderouillat, 2 months ago

New timers integration/reporting

  • Property svn:executable set to *
File size: 18.7 KB
Line 
1#include "p2p_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  extern CLogType logProtocol ;
12  extern CLogType logTimers ;
13
14  CP2pServerBuffer::CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
15                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
16                        : clientRank_(clientRank), interCommMerged_(interCommMerged), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
17  {
18    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
19    //CBufferIn bufferIn(buffer.data(),buffer.size()) ;
20    //bufferIn >> controlAddr_;
21    createWindow(commSelf, interCommMerged) ;
22    countDeletedBuffers_ = 0;
23  }
24
25  void CP2pServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
26  {
27    if (info.isActive(logTimers)) CTimer::get("create Windows").resume() ;
28    //MPI_Comm interComm ;
29    //xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
30    //xios::MPI_Intercomm_merge(interComm, true, &winComm_) ;
31    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
32    //xios::MPI_Comm_free(&interComm) ;
33   
34    //maxWindows_=MAX_WINDOWS ;
35    //windows_.resize(maxWindows_) ;
36   
37    //for(int i=0;i<maxWindows_;++i)
38    //{
39    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
40    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
41    //}
42    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
43    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
44    if (info.isActive(logTimers)) CTimer::get("create Windows").suspend() ;
45    //MPI_Barrier(winComm_) ;
46    //MPI_Barrier(winComm_) ;
47
48  }
49
50  void CP2pServerBuffer::receivedRequest(vector<char>& buffer)
51  {
52    size_t timeline ;
53    int nbSenders ;
54    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
55    bufferIn >> timeline ;
56    if (timeline==EVENT_BUFFER_RESIZE)
57    {
58      size_t AssociatedTimeline ;
59      size_t newSize ;
60      bufferIn >>AssociatedTimeline>>newSize ;
61      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
62    }
63    else // receive standard event
64    {
65      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
66                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
67      bufferIn>> nbSenders ;
68      nbSenders_[timeline] = nbSenders ;
69      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
70      if (pendingFullEvent==pendingFullEvents_.end()) 
71      {
72        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
73        pendingFullEvents_[timeline]=pendingEvent ;
74      }
75      else 
76      { 
77        pendingFullEvent->second.currentNbSenders++ ;
78        pendingFullEvent->second.buffers.push_back(this) ;
79      }
80   
81      int nbBlocs ; 
82      int count ;
83      int window ;
84      size_t start ; 
85      bufferIn >> nbBlocs ;
86      MPI_Aint bloc ;
87      auto& blocs = pendingBlocs_[timeline] ;
88      for(int i=0;i<nbBlocs;++i) 
89      {
90        bufferIn >> bloc >> count >> window >> start;
91        //info(logProtocol) << "Receiving window : "<<window << endl;
92        blocs.push_back({bloc, count, window,start}) ;
93      }
94    }
95  }
96
97  void CP2pServerBuffer::eventLoop(void)
98  {
99    int flag ;
100    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
101    if (pendingRmaRequests_.empty()) transferEvents() ;
102
103    //if (!isLocked_)
104    //{
105      if (lastBlocToFree_!=0)
106      {
107        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
108        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
109        //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
110        //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
111        //MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
112        //MPI_Win_unlock(windowRank_,winControl_) ;
113        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
114        lastBlocToFree_ = 0 ;       
115      }
116    //}
117
118    if (buffers_.size()>1) 
119    {
120      if (buffers_.front()->getCount()==0) {
121        // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1)
122        //     the front buffer can be deleted, no new message will be sent through the front buffer
123        delete buffers_.front();
124        buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer
125        //info(logProtocol) << "Deleting win : " << countDeletedBuffers_  << endl;
126        countDeletedBuffers_++;
127      }
128    }
129  }
130
131  void CP2pServerBuffer::notifyClientFinalize(void)
132  {
133    eventLoop() ; // to free the last bloc
134    //MPI_Aint finalize=1 ;
135    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
136    //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
137    //MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
138    //MPI_Win_unlock(windowRank_,winControl_) ;
139    int dummy ;
140    MPI_Send(&dummy, 0, MPI_CHAR, clientRank_, 22, interCommMerged_) ;
141  }
142 
143  void CP2pServerBuffer::testPendingRequests(void)
144  {
145    if (!pendingRmaRequests_.empty())
146    {
147      int flag ;   
148
149      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
150      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
151      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
152     
153      if (flag==true) 
154      {
155        //if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
156        //for(auto& win : windowsLocked_)
157        //{
158        //  info(logProtocol)<<"unlock window "<<win<<endl ;
159        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
160        //  MPI_Win_unlock(windowRank_,windows_[win]) ;
161        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
162        //}
163        //windowsLocked_.clear() ;
164       
165
166        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
167        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
168       
169        size_t transferedSize = 0 ;
170        for(auto& count : pendingRmaCount_) transferedSize+=count ;
171
172        if (info.isActive(logProtocol))
173        {
174          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
175          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
176                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
177                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
178          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
179          for(int i=0;i<pendingRmaAddr_.size();i++)
180          {
181            size_t checksum=0 ;
182            unsigned char* buffer = (unsigned char*) pendingRmaAddr_[i] ;
183            for(size_t j=0;j<pendingRmaCount_[i];j++) checksum += buffer[j] ;
184            info(logProtocol)<<"Bloc transfered to adrr="<<(void*) buffer<<"  count="<<pendingRmaCount_[i]<<"  checksum="<<checksum<<endl ;
185          }
186
187         }
188
189        //isLocked_=false ;
190        pendingRmaRequests_.clear() ;
191        pendingRmaStatus_.clear() ;
192        pendingRmaCount_.clear() ;
193        pendingRmaAddr_.clear() ;
194        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
195       
196        for(auto & event : onTransferEvents_) 
197        {
198          size_t timeline = event.first ;
199
200          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
201          pendingFullEvent->second.nbSenders-- ;
202          pendingFullEvent->second.currentNbSenders-- ;
203         
204
205          auto completedFullEvent=completedFullEvents_.find(timeline) ;
206          if (completedFullEvent==completedFullEvents_.end()) 
207          {
208            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
209            completedFullEvents_[timeline]=pendingEvent ;
210          }
211          else 
212          {
213            completedFullEvent->second.currentNbSenders++ ;
214            completedFullEvent->second.buffers.push_back(this) ;
215          }
216          nbSenders_.erase(timeline) ;
217        } 
218        onTransferEvents_.clear() ;
219      }
220    }
221
222  }
223 
224  size_t CP2pServerBuffer::remainSize(void)
225  {
226    if (!fixed_) return std::numeric_limits<size_t>::max() ;
227    else
228    {
229      if (currentBuffer_ == nullptr) return fixedSize_ ;
230      else return currentBuffer_->remain() ;
231    }
232  }
233 
234  size_t CP2pServerBuffer::remainSize(int bufferId)
235  {
236    if (bufferId-countDeletedBuffers_>=buffers_.size())
237    {
238      //info(logProtocol) << "The buffer " << bufferId << " is not yet allocated" << endl;
239      return 0;
240    }     
241    return buffers_[bufferId-countDeletedBuffers_]->remain() ;
242  }
243
244
245  void CP2pServerBuffer::transferEvents(void)
246  {
247    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
248    {
249      size_t remain=remainSize() ;
250      size_t transferedSize=0 ;
251
252      size_t timeline =  pendingBlocs_.begin()->first ;
253      auto& blocs = pendingBlocs_.begin()->second ; // map<size_t  , list<tuple<MPI_Aint,int ,int,size_t>>> pendingBlocs_;
254                                                    //     timeline,            addr    ,size,win,start
255      // addr   = std::get<0>(bloc) ;
256      // size   = std::get<1>(bloc) ;
257      // window = std::get<2>(bloc) ;
258      // start  = std::get<3>(bloc) ; // start : used to check mirror behavior
259
260      size_t eventSize=0 ;
261     
262      //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
263     
264      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
265      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
266      //for(auto& bloc : blocs)
267      //{
268      //  int win=get<2>(bloc) ;
269      //  if (windowsLocked_.count(win)==0)
270      //  {
271      //    info(logProtocol)<<"lock window "<<win<<endl ;
272      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
273      //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
274      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
275      //    windowsLocked_.insert(win) ;
276      //  }
277      //}
278      //isLocked_=true ;
279//      do
280
281      bool spaceForAllblocks = true;
282      int lastBufferUsed = -1;
283      if (blocs.size()==0) spaceForAllblocks = false;
284      else
285      {
286        for(auto& bloc : blocs)
287        {
288          //info(logProtocol) << "blocSize = " << get<1>(bloc)
289          //                  << " - remain in win : " << get<2>(bloc) << " : " << remainSize( get<2>(bloc) )
290          //                  << "; bufferResize_ = " <<  bufferResize_.size() << endl;
291         
292          // if the active buffer change, the new buffer must be considered as empty
293          if (lastBufferUsed != get<2>(bloc) ) eventSize = 0;
294
295          // if the targeted buffer does not exist
296          if ( get<2>(bloc)-countDeletedBuffers_>=buffers_.size() )
297          {
298            if ( bufferResize_.empty() ) // no resize order
299            {
300              spaceForAllblocks = false;
301              break;
302            }
303          }
304          else if ( ( get<1>(bloc) > (remainSize(get<2>(bloc))-eventSize) ) )  // if there is no enough place in the targeted bloc
305          {
306            spaceForAllblocks = false;
307            break;
308          }
309          else
310          {
311            // if there is enough place in the targeted bloc, store the
312            lastBufferUsed = get<2>(bloc);
313            eventSize += get<1>(bloc);
314          }
315        }
316      }     
317
318      if (spaceForAllblocks)
319      {
320        transferEvent() ; // ok enough storage for this bloc
321       
322        transferedSize += eventSize ;
323        pendingBlocs_.erase(pendingBlocs_.begin()) ;
324       
325        //  break ; // transfering just one event temporary => to remove
326       
327//        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
328//
329//        timeline =  pendingBlocs_.begin()->first ;
330//        auto& blocs=pendingBlocs_.begin()->second ;
331//       
332//
333//        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
334//        if (transferedSize+eventSize<=remain)
335        {
336          //for(auto& bloc : blocs)
337          //{
338          //  int win=get<2>(bloc) ;
339          //  if (windowsLocked_.count(win)==0)
340          //  {
341          //    info(logProtocol)<<"lock window "<<win<<endl ;
342          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
343          //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
344          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
345          //    windowsLocked_.insert(win) ;
346          //  }
347          //}
348        }
349      }
350//      while(transferedSize+eventSize<=remain) ;
351     
352    }
353  }
354 
355  void CP2pServerBuffer::transferEvent(void)
356  {
357    MPI_Aint addr;
358    MPI_Aint offset ;
359
360    size_t size;
361    size_t start;
362    size_t count;
363    int window ;
364
365    auto& blocs=pendingBlocs_.begin()->second ;
366    size_t timeline = pendingBlocs_.begin() -> first ;
367 
368
369    for(auto& bloc : blocs)
370    {
371      addr = std::get<0>(bloc) ;
372      size = std::get<1>(bloc) ;
373      window = std::get<2>(bloc) ;
374      start = std::get<3>(bloc) ; // start : used to check mirror behavior
375
376      offset=0 ;
377
378      // Need to keep loop even if a given bloc will not be split.
379      // To mimic client behavior, especially if (size_==end_) reset end_ = 0 ;
380      do
381      {
382        //if ( (currentBuffer_!=nullptr) || (window-countDeletedBuffers_ == buffers_.size() ) )
383        {
384          if (window-countDeletedBuffers_ >= buffers_.size())
385            {
386              if (!bufferResize_.empty()) 
387                {
388                  if (bufferResize_.front().first==timeline)
389                    {
390                      currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
391                      //info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
392                      bufferResize_.pop_front() ;
393                      newBuffer(currentBufferSize_,fixed_) ;
394                    }
395                }
396            }
397         
398          buffers_[window-countDeletedBuffers_]->reserve(size, start, count) ;
399     
400          if ( count > 0)
401          {
402            transferRmaRequest(timeline, addr, offset, buffers_[window-countDeletedBuffers_], start, count, window) ;
403            offset=MPI_Aint_add(offset, count) ;
404           
405          }
406        }
407
408      } while (size > 0 ) ;
409    }
410
411    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
412  }
413
414  void CP2pServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
415  {
416    MPI_Request request ;
417    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
418    if (info.isActive(logProtocol))
419    {
420      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<"  window="<<window<<endl ;
421      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
422               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
423    }
424    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
425    //MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
426    MPI_Irecv(buffer->getBuffer()+start, count, MPI_CHAR, clientRank_, 21, interCommMerged_, &request) ;
427    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
428    pendingRmaRequests_.push_back(request) ;
429    pendingRmaCount_.push_back(count) ;
430    pendingRmaAddr_.push_back(buffer->getBuffer()+start) ;
431    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
432  }
433
434  void CP2pServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
435  {
436    auto &completedEvent=completedEvents_[timeline] ;
437    size_t size=0 ;
438    for(auto& bloc : completedEvent) size+=bloc.count ;
439    char* buffer = new char[size] ;
440    size=0 ;
441   
442    ostringstream outStr ;
443    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
444                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
445    int i=0 ;
446    MPI_Aint addr ;
447    for(auto& bloc : completedEvent) 
448    {
449      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
450     
451      if (info.isActive(logProtocol))
452      {
453        size_t checksum=0 ;
454        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
455        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
456        i++ ;
457      }
458
459      size+=bloc.count ;
460      //info(logProtocol) << "Free from : " << bloc.start << ", size : " << bloc.count<< endl;
461      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
462      addr=bloc.addr ;
463      if (bloc.buffer->getCount()==0)
464      {
465        if (buffers_.size() > 1)
466        {
467          // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1)
468          //     the front buffer can be deleted, no new message will be sent through the front buffer
469          delete buffers_.front();
470          buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer
471          //info(logProtocol) << "Deleting win : " << countDeletedBuffers_  << endl;
472          countDeletedBuffers_++;
473        }
474      }
475    }
476    event.push(clientRank_, nullptr, buffer, size) ;
477    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
478    info(logProtocol)<<outStr.str()<<endl ;
479   
480    lastBlocToFree_=addr ;
481
482    completedEvents_.erase(timeline) ;
483    eventLoop() ;
484  }
485
486 
487
488}
Note: See TracBrowser for help on using the repository browser.