Changeset 2399


Ignore:
Timestamp:
09/09/22 17:23:16 (21 months ago)
Author:
ymipsl
Message:

-Fix performance issue in one_sided protocol

  • better timer instrumentation of the protocol

YM

Location:
XIOS3/trunk/src
Files:
8 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/server.cpp

    r2335 r2399  
    6060      else is_MPI_Initialized=false ; 
    6161      MPI_Comm globalComm=CXios::getGlobalComm() ; 
    62  
     62      CTimer::get("XIOS server").resume() ; 
    6363      ///////////////////////////////////////// 
    6464      ///////////// PART 1 //////////////////// 
     
    409409    { 
    410410      CTimer::get("XIOS").suspend() ; 
    411       
     411      CTimer::get("XIOS server").suspend() ; 
    412412      delete eventScheduler ; 
    413413 
  • XIOS3/trunk/src/timer.hpp

    r2274 r2399  
    2020      void resume(void); 
    2121      void reset(void); 
     22      void add(double time) { cumulatedTime+=time ;} 
     23      void minus(double time) { cumulatedTime-=time ;} 
    2224      double getCumulatedTime(void); 
    2325      static std::map<std::string,CTimer> allTimer; 
     
    2628      static std::string getAllCumulatedTime(void) ; 
    2729      static void release(void) { allTimer.clear() ;} 
     30      bool isSuspended() { return suspended; } 
    2831  }; 
    2932} 
  • XIOS3/trunk/src/transport/one_sided_client_buffer.cpp

    r2347 r2399  
    4343    CXios::getMpiGarbageCollector().registerWindow(winControl_) ; 
    4444 
    45      
    46  
    47     CTimer::get("create Windows").suspend() ; 
    48  
    4945    MPI_Barrier(winComm_) ; 
    5046    MPI_Win_attach(winControl_, control_, controlSize_*sizeof(MPI_Aint)) ; 
    5147    MPI_Barrier(winComm_) ; 
     48    CTimer::get("create Windows").suspend() ; 
     49  
    5250 //   MPI_Win_lock(MPI_LOCK_EXCLUSIVE, 0, 0, winControl_) ; 
    5351 //   MPI_Win_unlock(0,winControl_) ; 
     
    217215    { 
    218216      request = requests_.front() ; 
     217      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").resume() ; 
    219218      MPI_Test(&request.mpiRequest, &flag, MPI_STATUS_IGNORE) ; 
     219      if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Test").suspend() ; 
    220220      if (flag==true) 
    221221      { 
     
    260260      } 
    261261    } 
     262    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").resume() ; 
    262263    MPI_Isend(request.buffer->start(),request.buffer->count(), MPI_CHAR, serverRank_, 20, interComm_, &request.mpiRequest ) ; 
     264    if (info.isActive(logProtocol)) CTimer::get("sendTimelineEvent : MPI_Isend").suspend() ; 
    263265    info(logProtocol)<<outStr.str()<<endl ; 
    264266    requests_.push_back(request) ; 
  • XIOS3/trunk/src/transport/one_sided_client_buffer.hpp

    r2343 r2399  
    190190      bool fixed_=false; 
    191191      size_t fixedSize_ = 0 ; 
    192       size_t currentBufferSize_=0 ; 
     192      size_t currentBufferSize_= 0 ; 
    193193      double growingFactor_ = 2. ;  
    194194      MPI_Aint lastFreedBloc_=0 ; 
  • XIOS3/trunk/src/transport/one_sided_context_client.cpp

    r2343 r2399  
    9696        } 
    9797        itBuffer->second->eventLoop() ; 
     98        double time=CTimer::getTime() ; 
    9899        bool succed = itBuffer->second->writeEvent(timeLine, event)  ; 
     100        if (succed)  
     101        { 
     102          time=CTimer::getTime()-time ; 
     103          if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").minus(time) ; 
     104        } 
     105 
    99106        if (succed) event.remove() ; 
    100107        else event.next() ; 
    101         if (event.isFirst()) callGlobalEventLoop() ; 
    102       } 
    103        
     108        if (event.isFirst()) 
     109        { 
     110          if (CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").resume() ; 
     111          callGlobalEventLoop() ; 
     112        }  
     113      } 
     114      if (!CTimer::get("Blocking time").isSuspended()) CTimer::get("Blocking time").suspend() ; 
     115 
    104116      if (isAttachedModeEnabled()) // couldBuffer is always true in attached mode 
    105117      { 
  • XIOS3/trunk/src/transport/one_sided_context_server.cpp

    r2343 r2399  
    9393    int flag; 
    9494    MPI_Status status; 
    95      
    96     traceOff(); 
    97     MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm, &flag, &status); 
    98     traceOn(); 
    99     if (flag==true) 
    100     { 
    101       requests_.push_back(CRequest(interComm, status)) ; 
    102       if (requests_.back().test())  
    103       { 
    104         processRequest(requests_.back()) ; 
    105         requests_.pop_back() ; 
     95    flag=true ; 
     96 
     97    while(flag) 
     98    { 
     99      traceOff(); 
     100      MPI_Iprobe(MPI_ANY_SOURCE, 20,interComm, &flag, &status); 
     101      traceOn(); 
     102      if (flag==true) 
     103      { 
     104        requests_.push_back(CRequest(interComm, status)) ; 
     105        if (requests_.back().test())  
     106        { 
     107          processRequest(requests_.back()) ; 
     108          requests_.pop_back() ; 
     109        } 
    106110      } 
    107111    } 
     
    139143    if (!pendingEvents_.empty()) 
    140144    { 
     145/* 
    141146      SPendingEvent& nextEvent = pendingEvents_.begin()->second ; 
    142147      for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ; 
    143148      if (nextEvent.nbSenders==0) pendingEvents_.erase(pendingEvents_.begin()) ; 
     149*/ 
     150      for(auto it=pendingEvents_.begin() ;  it!=pendingEvents_.end() ;) 
     151      { 
     152        SPendingEvent& nextEvent = it->second ; 
     153        for(auto& buffer : nextEvent.buffers ) buffer->eventLoop() ; 
     154        if (nextEvent.nbSenders==0) it=pendingEvents_.erase(it) ; 
     155        else ++it ; 
     156      } 
    144157    } 
    145158  } 
     
    188201          CEventServer event(this) ; 
    189202          for(auto& buffer : it->second.buffers) buffer->fillEventServer(currentTimeLine, event) ; 
    190  
     203          MPI_Barrier(intraComm) ; 
    191204          CTimer::get("Process events").resume(); 
    192205          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event.classId<<" of type "<<event.type<<endl ; 
     
    281294    else if (event.classId==CGrid::GetType()) CGrid::dispatchEvent(event); 
    282295    else if (event.classId==CGridGroup::GetType()) CGridGroup::dispatchEvent(event); 
    283     else if (event.classId==CField::GetType()) CField::dispatchEvent(event); 
     296    else if (event.classId==CField::GetType())  
     297    { 
     298      if (event.type==CField::EVENT_ID_UPDATE_DATA) CField::dispatchEvent(event); 
     299      else CField::dispatchEvent(event); 
     300    } 
    284301    else if (event.classId==CFieldGroup::GetType()) CFieldGroup::dispatchEvent(event); 
    285302    else if (event.classId==CFile::GetType()) CFile::dispatchEvent(event); 
  • XIOS3/trunk/src/transport/one_sided_server_buffer.cpp

    r2346 r2399  
    6161    else // receive standard event 
    6262    { 
    63  
     63      info(logProtocol)<<"received request from rank : "<<clientRank_<<"  with timeline : "<<timeline 
     64                                                        <<"   at time : "<<CTimer::get("XIOS server").getTime()<<endl ; 
    6465      bufferIn>> nbSenders ; 
    6566      nbSenders_[timeline] = nbSenders ; 
     
    101102      { 
    102103        info(logProtocol)<<"Send bloc to free : "<<lastBlocToFree_<<endl ; 
     104        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").resume() ; 
    103105        MPI_Win_lock(MPI_LOCK_EXCLUSIVE, windowRank_, 0, winControl_) ; 
    104106        MPI_Aint target=MPI_Aint_add(controlAddr_, CONTROL_ADDR*sizeof(MPI_Aint)) ; 
    105107        MPI_Put(&lastBlocToFree_, 1, MPI_AINT, windowRank_, target, 1, MPI_AINT, winControl_) ; 
    106108        MPI_Win_unlock(windowRank_,winControl_) ;  
     109        if (info.isActive(logProtocol)) CTimer::get("Send bloc to free").suspend() ; 
    107110        lastBlocToFree_ = 0 ;         
    108111      } 
     
    126129    if (!pendingRmaRequests_.empty()) 
    127130    { 
    128       int flag ; 
     131      int flag ;     
     132 
     133      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").resume() ; 
    129134      MPI_Testall(pendingRmaRequests_.size(), pendingRmaRequests_.data(), &flag, pendingRmaStatus_.data()) ; 
     135      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Testall").suspend() ; 
     136       
    130137      if (flag==true)  
    131138      { 
     
    134141        { 
    135142          info(logProtocol)<<"unlock window "<<win<<endl ; 
     143          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").resume() ; 
    136144          MPI_Win_unlock(windowRank_,windows_[win]) ;  
     145          if (info.isActive(logProtocol)) CTimer::get("transfer unlock").suspend() ; 
    137146        } 
    138147        windowsLocked_.clear() ; 
    139            
     148         
     149 
     150        if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).suspend() ; 
     151        if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).suspend() ; 
     152         
     153        size_t transferedSize = 0 ; 
     154        for(auto& count : pendingRmaCount_) transferedSize+=count ; 
     155 
     156        if (info.isActive(logProtocol)) 
     157        { 
     158          double time = CTimer::get("lastTransfer from "+std::to_string(clientRank_)).getCumulatedTime() ; 
     159          info(logProtocol)<<"Tranfer message from rank : "<<clientRank_<<"  nbBlocs : "<< pendingRmaStatus_.size() 
     160                           << "  total count = "<<transferedSize<<"  duration : "<<time<<" s" 
     161                           << "  Bandwith : "<< transferedSize/time<< "byte/s"<<endl ; 
     162          CTimer::get("lastTransfer from "+std::to_string(clientRank_)).reset() ; 
     163         } 
     164 
    140165        isLocked_=false ; 
    141166        pendingRmaRequests_.clear() ; 
    142167        pendingRmaStatus_.clear() ; 
     168        pendingRmaCount_.clear() ; 
    143169        completedEvents_.insert(onTransferEvents_.begin(),onTransferEvents_.end()) ; 
    144170         
     
    195221        if (bufferResize_.front().first==timeline) 
    196222        { 
    197           currentBufferSize_=bufferResize_.front().second ; 
     223          currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 
    198224          info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ; 
    199225          bufferResize_.pop_front() ; 
     
    216242       
    217243      if (isLocked_) ERROR("void COneSidedServerBuffer::transferEvents(void)",<<"windows is Locked"); 
     244       
     245      if (info.isActive(logProtocol)) CTimer::get("transfer MPI_Rget from "+std::to_string(clientRank_)).resume() ; 
     246      if (info.isActive(logProtocol)) CTimer::get("lastTransfer from "+std::to_string(clientRank_)).resume() ; 
    218247      for(auto& bloc : blocs)  
    219248      { 
     
    221250        if (windowsLocked_.count(win)==0)  
    222251        { 
     252          info(logProtocol)<<"lock window "<<win<<endl ; 
     253          if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ; 
    223254          MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; 
     255          if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ; 
    224256          windowsLocked_.insert(win) ; 
    225257        } 
     
    233265        pendingBlocs_.erase(pendingBlocs_.begin()) ; 
    234266         
    235 //        break ; // transfering just one event temporary => to remove 
     267        //  break ; // transfering just one event temporary => to remove 
    236268         
    237269        if (pendingBlocs_.empty()) break ; // no more blocs to tranfer => exit loop 
     
    244276          if (bufferResize_.front().first==timeline) 
    245277          { 
    246             currentBufferSize_=bufferResize_.front().second ; 
     278            currentBufferSize_=bufferResize_.front().second * bufferServerFactor_ ; 
    247279            info(logProtocol)<<"Received new buffer size="<<currentBufferSize_<<"  at timeline="<<timeline<<endl ; 
    248280            bufferResize_.pop_front() ; 
     
    259291            if (windowsLocked_.count(win)==0)  
    260292            { 
     293              info(logProtocol)<<"lock window "<<win<<endl ; 
     294              if (info.isActive(logProtocol)) CTimer::get("transfer lock").resume() ; 
    261295              MPI_Win_lock(MPI_LOCK_SHARED, windowRank_, 0, windows_[win]) ; 
     296              if (info.isActive(logProtocol)) CTimer::get("transfer lock").suspend() ; 
    262297              windowsLocked_.insert(win) ; 
    263298            } 
     
    331366    MPI_Request request ; 
    332367    MPI_Aint offsetAddr=MPI_Aint_add(addr, offset) ; 
    333     info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ; 
    334     info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 
    335              <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 
     368    if (info.isActive(logProtocol)) 
     369    { 
     370      info(logProtocol)<<"receive Bloc from client "<<clientRank_<<" : timeline="<<timeline<<"  addr="<<addr<<"  count="<<count<<" buffer="<<buffer<<"  start="<<start<<endl ; 
     371      info(logProtocol)<<"check dest buffers ; start_buffer="<<static_cast<void*>(buffer->getBuffer())<<"  end_buffer="<<static_cast<void*>(buffer->getBuffer()+buffer->getSize()-1) 
     372               <<"  start="<<static_cast<void*>(buffer->getBuffer()+start)<<"   end="<<static_cast<void*>(buffer->getBuffer()+start+count-1)<<endl ; 
     373    } 
     374    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").resume() ; 
    336375    MPI_Rget(buffer->getBuffer()+start, count, MPI_CHAR, windowRank_, offsetAddr, count, MPI_CHAR, windows_[window], &request) ; 
     376    if (info.isActive(logProtocol)) CTimer::get("MPI_Rget").suspend() ; 
    337377    pendingRmaRequests_.push_back(request) ; 
     378    pendingRmaCount_.push_back(count) ; 
    338379    onTransferEvents_[timeline].push_back({buffer,start,count,addr}) ; 
    339380  } 
     
    348389     
    349390    ostringstream outStr ; 
    350     outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline<<"  nbBlocs="<<completedEvent.size()<<endl ; 
     391    if (info.isActive(logProtocol)) outStr<<"Received Event from client "<<clientRank_<<"  timeline="<<timeline 
     392                                          <<"  nbBlocs="<<completedEvent.size()<<endl ; 
    351393    int i=0 ; 
    352394    MPI_Aint addr ; 
  • XIOS3/trunk/src/transport/one_sided_server_buffer.hpp

    r2343 r2399  
    132132      size_t currentBufferSize_=0 ; 
    133133      double growingFactor_ = 2. ; 
     134      double bufferServerFactor_=10. ; 
    134135       
    135136      std::list<CBuffer*> buffers_ ; 
     
    144145      vector<MPI_Request> pendingRmaRequests_ ; 
    145146      vector<MPI_Status> pendingRmaStatus_ ; 
     147      vector<int> pendingRmaCount_ ; 
    146148 
    147149      map<size_t, list<SBloc>> onTransferEvents_ ; // map<size_t timeline, list<pair<char* bloc, int count>>> 
Note: See TracChangeset for help on using the changeset viewer.