Ignore:
Timestamp:
08/29/23 17:24:04 (10 months ago)
Author:
ymipsl
Message:

Major update :

  • New method to lock and unlock one-sided windows (window_dynamic) to avoid network overhead
  • Introducing multithreading on server sided to manage more efficiently dead-lock occuring (similar to co-routine which will be available and implemented in futur c++ standard), based on c++ threads
  • Suprression of old "attached mode" which is replaced by online writer and reder filters

YM

File:
1 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk/src/transport/legacy_context_server.cpp

    r2528 r2547  
    4343    finished=false; 
    4444 
    45     if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
    46     else interCommMerged_ = interComm_; // interComm_ is yet an intracommunicator in attached 
     45    MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
    4746    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 
    4847     
     
    5049 
    5150    pureOneSided=CXios::getin<bool>("pure_one_sided",false); // pure one sided communication (for test) 
    52     if (isAttachedModeEnabled()) pureOneSided=false ; // no one sided in attach mode 
    53        
    5451  } 
    5552  
     
    115112      remoteHashId_ = recvBuff[0] ; 
    116113      StdSize buffSize = recvBuff[1]; 
    117       vector<MPI_Aint> winAdress(2) ; 
    118       winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
     114      vector<MPI_Aint> winBufferAddress(2) ; 
     115      winBufferAddress[0]=recvBuff[2] ; winBufferAddress[1]=recvBuff[3] ; 
    119116      mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    120117 
    121118      // create windows dynamically for one-sided 
    122       if (!isAttachedModeEnabled()) 
    123       {  
    124         CTimer::get("create Windows").resume() ; 
    125         MPI_Comm interComm ; 
    126         MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ; 
    127         MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
    128         CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
    129         MPI_Comm_free(&interComm) ; 
    130         windows_[rank].resize(2) ; 
    131         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
    132         CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
    133         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 
    134         CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
    135         CTimer::get("create Windows").suspend() ; 
    136         MPI_Barrier(winComm_[rank]) ; 
    137       } 
    138       else 
    139       { 
    140         winComm_[rank] = MPI_COMM_NULL ; 
    141         windows_[rank].resize(2) ; 
    142         windows_[rank][0] = MPI_WIN_NULL ; 
    143         windows_[rank][1] = MPI_WIN_NULL ; 
    144       }    
    145  
    146       it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 
     119      int dummy ; 
     120      MPI_Send(&dummy, 0, MPI_INT, rank, 21,interCommMerged_) ; 
     121      CTimer::get("create Windows").resume() ; 
     122      MPI_Comm interComm ; 
     123      int tag = 0 ; 
     124      MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, tag , &interComm) ; 
     125      MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
     126      MPI_Comm_free(&interComm) ; 
     127      windows_[rank].resize(2) ; 
     128      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     129      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
     130      //MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 
     131      //CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
     132      windows_[rank][0] = new CWindowDynamic() ; 
     133      windows_[rank][1] = new CWindowDynamic() ; 
     134      windows_[rank][0] -> create(winComm_[rank]) ; 
     135      windows_[rank][1] -> create(winComm_[rank]) ; 
     136      windows_[rank][0] -> setWinBufferAddress(winBufferAddress[0],0) ; 
     137      windows_[rank][1] -> setWinBufferAddress(winBufferAddress[1],0) ; 
     138      CTimer::get("create Windows").suspend() ; 
     139      CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
     140      MPI_Barrier(winComm_[rank]) ; 
     141 
     142      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winBufferAddress, 0, buffSize)))).first; 
    147143      lastTimeLine[rank]=0 ; 
    148144      itLastTimeLine=lastTimeLine.begin() ; 
     
    230226  { 
    231227    CTimer::get("CLegacyContextServer::getBufferFromClient").resume() ; 
    232     if (!isAttachedModeEnabled()) // one sided desactivated in attached mode 
    233     {   
    234       int rank ; 
    235       char *buffer ; 
    236       size_t count ;  
    237  
    238       if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
    239       for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
    240       { 
    241         rank=itLastTimeLine->first ; 
    242         if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 
    243         { 
    244           if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 
    245           if (count >= 0) ++itLastTimeLine ; 
    246           break ; 
    247         } 
     228 
     229    int rank ; 
     230    char *buffer ; 
     231    size_t count ;  
     232 
     233    if (itLastTimeLine==lastTimeLine.end()) itLastTimeLine=lastTimeLine.begin() ; 
     234    for(;itLastTimeLine!=lastTimeLine.end();++itLastTimeLine) 
     235    { 
     236      rank=itLastTimeLine->first ; 
     237      if (itLastTimeLine->second < timeLine &&  pendingRequest.count(rank)==0 && buffers[rank]->isBufferEmpty()) 
     238      { 
     239        if (buffers[rank]->getBufferFromClient(timeLine, buffer, count)) processRequest(rank, buffer, count); 
     240        if (count >= 0) ++itLastTimeLine ; 
     241        break ; 
    248242      } 
    249243    } 
     
    280274      { 
    281275        size_t newSize ; 
    282         vector<MPI_Aint> winAdress(2) ; 
    283         newBuffer>>newSize>>winAdress[0]>>winAdress[1] ; 
     276        vector<MPI_Aint> winBufferAdress(2) ; 
     277        newBuffer>>newSize>>winBufferAdress[0]>>winBufferAdress[1] ; 
    284278        buffers[rank]->freeBuffer(count) ; 
    285279        delete buffers[rank] ; 
    286         buffers[rank] = new CServerBuffer(windows_[rank], winAdress, 0, newSize) ; 
     280        windows_[rank][0] -> setWinBufferAddress(winBufferAdress[0],0) ; 
     281        windows_[rank][1] -> setWinBufferAddress(winBufferAdress[1],0) ; 
     282        buffers[rank] = new CServerBuffer(windows_[rank], winBufferAdress, 0, newSize) ; 
    287283        info(100)<<"Context id "<<context->getId()<<" : Receive ChangeBufferSize from client rank "<<rank 
    288                  <<"  newSize : "<<newSize<<" Address : "<<winAdress[0]<<" & "<<winAdress[1]<<endl ; 
     284                 <<"  newSize : "<<newSize<<" Address : "<<winBufferAdress[0]<<" & "<<winBufferAdress[1]<<endl ; 
    289285      } 
    290286      else 
     
    309305    CEventServer* event; 
    310306     
    311 //    if (context->isProcessingEvent()) return ; 
    312307    if (isProcessingEvent_) return ; 
    313     if (isAttachedModeEnabled()) 
    314       if (!CXios::getDaemonsManager()->isScheduledContext(remoteHashId_)) return ; 
    315308 
    316309    it=events.find(currentTimeLine); 
     
    321314      if (event->isFull()) 
    322315      { 
    323         if (!scheduled && !isAttachedModeEnabled()) // Skip event scheduling for attached mode and reception on client side 
     316        if (!scheduled) 
    324317        { 
    325318          eventScheduler_->registerEvent(currentTimeLine,hashId); 
     
    327320          scheduled=true; 
    328321        } 
    329         else if (isAttachedModeEnabled() || eventScheduler_->queryEvent(currentTimeLine,hashId) ) 
     322        else if (eventScheduler_->queryEvent(currentTimeLine,hashId) ) 
    330323        { 
    331324          if (!enableEventsProcessing && isCollectiveEvent(*event)) return ; 
     
    346339          } 
    347340           
    348           if (CXios::checkEventSync) 
     341          if (CXios::checkEventSync && context->getServiceType()!=CServicesManager::CLIENT) 
    349342          { 
    350343            int typeId, classId, typeId_in, classId_in; 
     
    364357          } 
    365358 
    366           if (!isAttachedModeEnabled()) eventScheduler_->popEvent() ; 
    367           //MPI_Barrier(intraComm) ; 
    368          // When using attached mode, synchronise the processes to avoid that differents event be scheduled by differents processes 
    369          // The best way to properly solve this problem will be to use the event scheduler also in attached mode 
    370          // for now just set up a MPI barrier 
    371 //ym to be check later 
    372 //         if (!eventScheduler_ && CXios::isServer) MPI_Barrier(intraComm) ; 
    373  
    374 //         context->setProcessingEvent() ; 
    375          isProcessingEvent_=true ; 
    376          CTimer::get("Process events").resume(); 
    377          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 
    378          dispatchEvent(*event); 
    379          CTimer::get("Process events").suspend(); 
    380          isProcessingEvent_=false ; 
    381 //         context->unsetProcessingEvent() ; 
    382          pendingEvent=false; 
    383          delete event; 
    384          events.erase(it); 
    385          currentTimeLine++; 
    386          scheduled = false; 
    387          if (isAttachedModeEnabled()) CXios::getDaemonsManager()->unscheduleContext() ; 
     359          isProcessingEvent_=true ; 
     360          CTimer::get("Process events").resume(); 
     361          info(100)<<"Context id "<<context->getId()<<" : Process Event "<<currentTimeLine<<" of class "<<event->classId<<" of type "<<event->type<<endl ; 
     362          eventScheduler_->popEvent() ; 
     363          dispatchEvent(*event); 
     364          CTimer::get("Process events").suspend(); 
     365          isProcessingEvent_=false ; 
     366          pendingEvent=false; 
     367          delete event; 
     368          events.erase(it); 
     369          currentTimeLine++; 
     370          scheduled = false; 
    388371        } 
    389372      } 
     
    409392  void CLegacyContextServer::freeWindows() 
    410393  { 
    411     //if (!isAttachedModeEnabled()) 
    412     //{ 
    413     //  for(auto& it : winComm_) 
    414     //  { 
    415     //    int rank = it.first ; 
    416     //    MPI_Win_free(&windows_[rank][0]); 
    417     //    MPI_Win_free(&windows_[rank][1]); 
    418     //    MPI_Comm_free(&winComm_[rank]) ; 
    419     //  } 
    420     //} 
     394    for(auto& it : winComm_) 
     395    { 
     396      int rank = it.first ; 
     397      delete windows_[rank][0]; 
     398      delete windows_[rank][1]; 
     399    } 
    421400  } 
    422401 
Note: See TracChangeset for help on using the changeset viewer.