source: XIOS3/trunk/src/transport/p2p_server_buffer.cpp @ 2594

Last change on this file since 2594 was 2594, checked in by jderouillat, 7 months ago

Update the p2p protocol as a mirror protocol : the servers buffers will strictly mirror (number of buffers, positions of messages in the buffers) the clients buffers. The memory consumption of servers will be capped impplicitly by the clients behavior where the time spent to wait for free buffers could be present again.

  • Property svn:executable set to *
File size: 18.6 KB
Line 
1#include "p2p_server_buffer.hpp"
2#include "xios_spl.hpp"
3#include "mpi.hpp"
4#include "timer.hpp"
5#include "buffer_in.hpp"
6
7
8
9namespace xios
10{
11  extern CLogType logProtocol ;
12
13  CP2pServerBuffer::CP2pServerBuffer(int clientRank, const MPI_Comm& commSelf, const MPI_Comm& interCommMerged, map<size_t, SPendingEvent>& pendingEvents, 
14                                               map<size_t, SPendingEvent>& completedEvents, vector<char>& buffer) 
15                        : clientRank_(clientRank), interCommMerged_(interCommMerged), pendingFullEvents_(pendingEvents), completedFullEvents_(completedEvents)
16  {
17    //MPI_Alloc_mem(controlSize_*sizeof(MPI_Aint), MPI_INFO_NULL, &control_) ;
18    //CBufferIn bufferIn(buffer.data(),buffer.size()) ;
19    //bufferIn >> controlAddr_;
20    createWindow(commSelf, interCommMerged) ;
21    countDeletedBuffers_ = 0;
22  }
23
24  void CP2pServerBuffer::createWindow(const MPI_Comm& commSelf, const MPI_Comm& interCommMerged)
25  {
26    CTimer::get("create Windows").resume() ;
27    //MPI_Comm interComm ;
28    //xios::MPI_Intercomm_create(commSelf, 0, interCommMerged, clientRank_, 0 , &interComm) ;
29    //xios::MPI_Intercomm_merge(interComm, true, &winComm_) ;
30    //CXios::getMpiGarbageCollector().registerCommunicator(winComm_) ;
31    //xios::MPI_Comm_free(&interComm) ;
32   
33    //maxWindows_=MAX_WINDOWS ;
34    //windows_.resize(maxWindows_) ;
35   
36    //for(int i=0;i<maxWindows_;++i)
37    //{
38    //  MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &windows_[i]);
39    //  CXios::getMpiGarbageCollector().registerWindow(windows_[i]) ;
40    //}
41    //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_, &winControl_);
42    //CXios::getMpiGarbageCollector().registerWindow(winControl_) ;
43    CTimer::get("create Windows").suspend() ;
44    //MPI_Barrier(winComm_) ;
45    //MPI_Barrier(winComm_) ;
46
47  }
48
49  void CP2pServerBuffer::receivedRequest(vector<char>& buffer)
50  {
51    size_t timeline ;
52    int nbSenders ;
53    CBufferIn bufferIn(buffer.data(),buffer.size()) ;
54    bufferIn >> timeline ;
55    if (timeline==EVENT_BUFFER_RESIZE)
56    {
57      size_t AssociatedTimeline ;
58      size_t newSize ;
59      bufferIn >>AssociatedTimeline>>newSize ;
60      bufferResize_.push_back({AssociatedTimeline,newSize}) ;
61    }
62    else // receive standard event
63    {
64      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline
65                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ;
66      bufferIn>> nbSenders ;
67      nbSenders_[timeline] = nbSenders ;
68      auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
69      if (pendingFullEvent==pendingFullEvents_.end()) 
70      {
71        SPendingEvent pendingEvent = {nbSenders,1,{this}} ;
72        pendingFullEvents_[timeline]=pendingEvent ;
73      }
74      else 
75      { 
76        pendingFullEvent->second.currentNbSenders++ ;
77        pendingFullEvent->second.buffers.push_back(this) ;
78      }
79   
80      int nbBlocs ; 
81      int count ;
82      int window ;
83      size_t start ; 
84      bufferIn >> nbBlocs ;
85      MPI_Aint bloc ;
86      auto& blocs = pendingBlocs_[timeline] ;
87      for(int i=0;i<nbBlocs;++i) 
88      {
89        bufferIn >> bloc >> count >> window >> start;
90        //info(logProtocol) << "Receiving window : "<<window << endl;
91        blocs.push_back({bloc, count, window,start}) ;
92      }
93    }
94  }
95
96  void CP2pServerBuffer::eventLoop(void)
97  {
98    int flag ;
99    if (!pendingRmaRequests_.empty()) testPendingRequests() ;
100    if (pendingRmaRequests_.empty()) transferEvents() ;
101
102    //if (!isLocked_)
103    //{
104      if (lastBlocToFree_!=0)
105      {
106        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ;
107        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ;
108        //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
109        //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ;
110        //MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
111        //MPI_Win_unlock(windowRank_,winControl_) ;
112        //if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ;
113        lastBlocToFree_ = 0 ;       
114      }
115    //}
116
117    if (buffers_.size()>1) 
118    {
119      if (buffers_.front()->getCount()==0) {
120        // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1)
121        //     the front buffer can be deleted, no new message will be sent through the front buffer
122        delete buffers_.front();
123        buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer
124        //info(logProtocol) << "Deleting win : " << countDeletedBuffers_  << endl;
125        countDeletedBuffers_++;
126      }
127    }
128  }
129
130  void CP2pServerBuffer::notifyClientFinalize(void)
131  {
132    eventLoop() ; // to free the last bloc
133    //MPI_Aint finalize=1 ;
134    //MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ;
135    //MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_FINALIZE*sizeof(MPI_Aint)) ;
136    //MPI_Put(&finalize, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ;
137    //MPI_Win_unlock(windowRank_,winControl_) ;
138    int dummy ;
139    MPI_Send(&dummy, 0, MPI_CHAR, clientRank_, 22, interCommMerged_) ;
140  }
141 
142  void CP2pServerBuffer::testPendingRequests(void)
143  {
144    if (!pendingRmaRequests_.empty())
145    {
146      int flag ;   
147
148      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ;
149      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ;
150      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ;
151     
152      if (flag==true) 
153      {
154        //if (!isLocked_) ERROR("void COneSidedServerBuffer::testPendingRequests(void)",<<"windows is not Locked");
155        //for(auto& win : windowsLocked_)
156        //{
157        //  info(logProtocol)<<"unlock window "<<win<<endl ;
158        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ;
159        //  MPI_Win_unlock(windowRank_,windows_[win]) ;
160        //  if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ;
161        //}
162        //windowsLocked_.clear() ;
163       
164
165        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ;
166        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ;
167       
168        size_t transferedSize = 0 ;
169        for(auto& count : pendingRmaCount_) transferedSize+=count ;
170
171        if (info.isActive(logProtocol))
172        {
173          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ;
174          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size()
175                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s"
176                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ;
177          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ;
178          for(int i=0;i<pendingRmaAddr_.size();i++)
179          {
180            size_t checksum=0 ;
181            unsigned char* buffer = (unsigned char*) pendingRmaAddr_[i] ;
182            for(size_t j=0;j<pendingRmaCount_[i];j++) checksum += buffer[j] ;
183            info(logProtocol)<<"Bloc transfered to adrr="<<(void*) buffer<<"  count="<<pendingRmaCount_[i]<<"  checksum="<<checksum<<endl ;
184          }
185
186         }
187
188        //isLocked_=false ;
189        pendingRmaRequests_.clear() ;
190        pendingRmaStatus_.clear() ;
191        pendingRmaCount_.clear() ;
192        pendingRmaAddr_.clear() ;
193        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ;
194       
195        for(auto & event : onTransferEvents_) 
196        {
197          size_t timeline = event.first ;
198
199          auto pendingFullEvent=pendingFullEvents_.find(timeline) ;
200          pendingFullEvent->second.nbSenders-- ;
201          pendingFullEvent->second.currentNbSenders-- ;
202         
203
204          auto completedFullEvent=completedFullEvents_.find(timeline) ;
205          if (completedFullEvent==completedFullEvents_.end()) 
206          {
207            SPendingEvent pendingEvent = {nbSenders_[timeline],1,{this}} ;
208            completedFullEvents_[timeline]=pendingEvent ;
209          }
210          else 
211          {
212            completedFullEvent->second.currentNbSenders++ ;
213            completedFullEvent->second.buffers.push_back(this) ;
214          }
215          nbSenders_.erase(timeline) ;
216        } 
217        onTransferEvents_.clear() ;
218      }
219    }
220
221  }
222 
223  size_t CP2pServerBuffer::remainSize(void)
224  {
225    if (!fixed_) return std::numeric_limits<size_t>::max() ;
226    else
227    {
228      if (currentBuffer_ == nullptr) return fixedSize_ ;
229      else return currentBuffer_->remain() ;
230    }
231  }
232 
233  size_t CP2pServerBuffer::remainSize(int bufferId)
234  {
235    if (bufferId-countDeletedBuffers_>=buffers_.size())
236    {
237      //info(logProtocol) << "The buffer " << bufferId << " is not yet allocated" << endl;
238      return 0;
239    }     
240    return buffers_[bufferId-countDeletedBuffers_]->remain() ;
241  }
242
243
244  void CP2pServerBuffer::transferEvents(void)
245  {
246    if (pendingRmaRequests_.empty() && !pendingBlocs_.empty())
247    {
248      size_t remain=remainSize() ;
249      size_t transferedSize=0 ;
250
251      size_t timeline =  pendingBlocs_.begin()->first ;
252      auto& blocs = pendingBlocs_.begin()->second ; // map<size_t  , list<tuple<MPI_Aint,int ,int,size_t>>> pendingBlocs_;
253                                                    //     timeline,            addr    ,size,win,start
254      // addr   = std::get<0>(bloc) ;
255      // size   = std::get<1>(bloc) ;
256      // window = std::get<2>(bloc) ;
257      // start  = std::get<3>(bloc) ; // start : used to check mirror behavior
258
259      size_t eventSize=0 ;
260     
261      //if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked");
262     
263      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ;
264      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ;
265      //for(auto& bloc : blocs)
266      //{
267      //  int win=get<2>(bloc) ;
268      //  if (windowsLocked_.count(win)==0)
269      //  {
270      //    info(logProtocol)<<"lock window "<<win<<endl ;
271      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
272      //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
273      //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
274      //    windowsLocked_.insert(win) ;
275      //  }
276      //}
277      //isLocked_=true ;
278//      do
279
280      bool spaceForAllblocks = true;
281      int lastBufferUsed = -1;
282      if (blocs.size()==0) spaceForAllblocks = false;
283      else
284      {
285        for(auto& bloc : blocs)
286        {
287          //info(logProtocol) << "blocSize = " << get<1>(bloc)
288          //                  << " - remain in win : " << get<2>(bloc) << " : " << remainSize( get<2>(bloc) )
289          //                  << "; bufferResize_ = " <<  bufferResize_.size() << endl;
290         
291          // if the active buffer change, the new buffer must be considered as empty
292          if (lastBufferUsed != get<2>(bloc) ) eventSize = 0;
293
294          // if the targeted buffer does not exist
295          if ( get<2>(bloc)-countDeletedBuffers_>=buffers_.size() )
296          {
297            if ( bufferResize_.empty() ) // no resize order
298            {
299              spaceForAllblocks = false;
300              break;
301            }
302          }
303          else if ( ( get<1>(bloc) > (remainSize(get<2>(bloc))-eventSize) ) )  // if there is no enough place in the targeted bloc
304          {
305            spaceForAllblocks = false;
306            break;
307          }
308          else
309          {
310            // if there is enough place in the targeted bloc, store the
311            lastBufferUsed = get<2>(bloc);
312            eventSize += get<1>(bloc);
313          }
314        }
315      }     
316
317      if (spaceForAllblocks)
318      {
319        transferEvent() ; // ok enough storage for this bloc
320       
321        transferedSize += eventSize ;
322        pendingBlocs_.erase(pendingBlocs_.begin()) ;
323       
324        //  break ; // transfering just one event temporary => to remove
325       
326//        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop
327//
328//        timeline =  pendingBlocs_.begin()->first ;
329//        auto& blocs=pendingBlocs_.begin()->second ;
330//       
331//
332//        for(auto& bloc : blocs) eventSize+=get<1>(bloc) ;
333//        if (transferedSize+eventSize<=remain)
334        {
335          //for(auto& bloc : blocs)
336          //{
337          //  int win=get<2>(bloc) ;
338          //  if (windowsLocked_.count(win)==0)
339          //  {
340          //    info(logProtocol)<<"lock window "<<win<<endl ;
341          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ;
342          //    MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ;
343          //    if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ;
344          //    windowsLocked_.insert(win) ;
345          //  }
346          //}
347        }
348      }
349//      while(transferedSize+eventSize<=remain) ;
350     
351    }
352  }
353 
354  void CP2pServerBuffer::transferEvent(void)
355  {
356    MPI_Aint addr;
357    MPI_Aint offset ;
358
359    size_t size;
360    size_t start;
361    size_t count;
362    int window ;
363
364    auto& blocs=pendingBlocs_.begin()->second ;
365    size_t timeline = pendingBlocs_.begin() -> first ;
366 
367
368    for(auto& bloc : blocs)
369    {
370      addr = std::get<0>(bloc) ;
371      size = std::get<1>(bloc) ;
372      window = std::get<2>(bloc) ;
373      start = std::get<3>(bloc) ; // start : used to check mirror behavior
374
375      offset=0 ;
376
377      // Need to keep loop even if a given bloc will not be split.
378      // To mimic client behavior, especially if (size_==end_) reset end_ = 0 ;
379      do
380      {
381        //if ( (currentBuffer_!=nullptr) || (window-countDeletedBuffers_ == buffers_.size() ) )
382        {
383          if (window-countDeletedBuffers_ >= buffers_.size())
384            {
385              if (!bufferResize_.empty()) 
386                {
387                  if (bufferResize_.front().first==timeline)
388                    {
389                      currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ;
390                      //info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ;
391                      bufferResize_.pop_front() ;
392                      newBuffer(currentBufferSize_,fixed_) ;
393                    }
394                }
395            }
396         
397          buffers_[window-countDeletedBuffers_]->reserve(size, start, count) ;
398     
399          if ( count > 0)
400          {
401            transferRmaRequest(timeline, addr, offset, buffers_[window-countDeletedBuffers_], start, count, window) ;
402            offset=MPI_Aint_add(offset, count) ;
403           
404          }
405        }
406
407      } while (size > 0 ) ;
408    }
409
410    pendingRmaStatus_.resize(pendingRmaRequests_.size()) ;
411  }
412
413  void CP2pServerBuffer::transferRmaRequest(size_t timeline, MPI_Aint addr, MPI_Aint offset, CBuffer* buffer, size_t start, int count, int window)
414  {
415    MPI_Request request ;
416    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ;
417    if (info.isActive(logProtocol))
418    {
419      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<"  window="<<window<<endl ;
420      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1)
421               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ;
422    }
423    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ;
424    //MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ;
425    MPI_Irecv(buffer->getBuffer()+start, count, MPI_CHAR, clientRank_, 21, interCommMerged_, &request) ;
426    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ;
427    pendingRmaRequests_.push_back(request) ;
428    pendingRmaCount_.push_back(count) ;
429    pendingRmaAddr_.push_back(buffer->getBuffer()+start) ;
430    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ;
431  }
432
433  void CP2pServerBuffer::fillEventServer(size_t timeline, CEventServer& event)
434  {
435    auto &completedEvent=completedEvents_[timeline] ;
436    size_t size=0 ;
437    for(auto& bloc : completedEvent) size+=bloc.count ;
438    char* buffer = new char[size] ;
439    size=0 ;
440   
441    ostringstream outStr ;
442    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline
443                                          <<"  nbBlocs="<<completedEvent.size()<<endl ;
444    int i=0 ;
445    MPI_Aint addr ;
446    for(auto& bloc : completedEvent) 
447    {
448      memcpy(&buffer[size], bloc.buffer->getBuffer()+bloc.start, bloc.count) ;
449     
450      if (info.isActive(logProtocol))
451      {
452        size_t checksum=0 ;
453        for(size_t j=0;j<bloc.count;j++) checksum += (unsigned char) buffer[size+j] ;
454        outStr<<"bloc "<<i<<"  count="<<bloc.count<<" checksum="<<checksum<<"  ;  " ;
455        i++ ;
456      }
457
458      size+=bloc.count ;
459      //info(logProtocol) << "Free from : " << bloc.start << ", size : " << bloc.count<< endl;
460      bloc.buffer->free(bloc.start, bloc.count) ; // free bloc
461      addr=bloc.addr ;
462      if (bloc.buffer->getCount()==0)
463      {
464        if (buffers_.size() > 1)
465        {
466          // If the front buffer is empty and if another buffer become the active one (buffers_.size()>1)
467          //     the front buffer can be deleted, no new message will be sent through the front buffer
468          delete buffers_.front();
469          buffers_.erase(buffers_.begin()) ; // if buffer is empty free buffer
470          //info(logProtocol) << "Deleting win : " << countDeletedBuffers_  << endl;
471          countDeletedBuffers_++;
472        }
473      }
474    }
475    event.push(clientRank_, nullptr, buffer, size) ;
476    if (info.isActive(logProtocol)) outStr<<" ==> nbSenders="<<event.getNbSender() ;
477    info(logProtocol)<<outStr.str()<<endl ;
478   
479    lastBlocToFree_=addr ;
480
481    completedEvents_.erase(timeline) ;
482    eventLoop() ;
483  }
484
485 
486
487}
Note: See TracBrowser for help on using the repository browser.