Changeset 2259 for XIOS


Ignore:
Timestamp:
11/17/21 16:56:04 (3 years ago)
Author:
ymipsl
Message:

Improvment of one-sided protocol.
Windows are now created in the flight for each client-server connection.
YM

Location:
XIOS/dev/dev_ym/XIOS_COUPLING/src
Files:
6 edited

Legend:

Unmodified
Added
Removed
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.cpp

    r2258 r2259  
    1414  size_t CClientBuffer::maxRequestSize = 0; 
    1515 
    16   CClientBuffer::CClientBuffer(MPI_Comm interComm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
     16  CClientBuffer::CClientBuffer(MPI_Comm interComm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize) 
    1717    : interComm(interComm) 
    1818    , clientRank_(0) 
     
    2525    , pending(false) 
    2626    , hasWindows(false)  
    27     , windows_(windows) 
    28   { 
    29     if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
    30     else hasWindows=true ; 
     27  { 
     28     /* 
     29      if (windows[0]==MPI_WIN_NULL && windows[1]==MPI_WIN_NULL) hasWindows=false ; 
     30      else hasWindows=true ; 
     31     */ 
    3132 
    3233      MPI_Alloc_mem(bufferSize+headerSize_, MPI_INFO_NULL, &bufferHeader[0]) ; 
     
    8687  MPI_Aint CClientBuffer::getWinAddress(int i) 
    8788  { 
    88      MPI_Aint address ; 
    89       
    90      if (hasWindows) MPI_Get_address(bufferHeader[i], &address) ; 
    91      else address=0 ; 
    92  
    93      return address ; 
    94   } 
     89    MPI_Aint address ; 
     90    MPI_Get_address(bufferHeader[i], &address) ; 
     91    return address ; 
     92  } 
     93 
     94  void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 
     95  { 
     96    windows_=windows ; 
     97    if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 
     98    else hasWindows=true ; 
     99 
     100    if (hasWindows) 
     101    {   
     102      MPI_Aint buffSize=bufferSize+headerSize_ ; 
     103      MPI_Win_attach(windows_[0], bufferHeader[0], buffSize) ; 
     104      MPI_Win_attach(windows_[1], bufferHeader[1], buffSize) ; 
     105     
     106      MPI_Group group ; 
     107      int groupSize,groupRank ; 
     108      MPI_Win_get_group(windows_[0], &group) ; 
     109      MPI_Group_size(group, &groupSize) ; 
     110      MPI_Group_rank(group, &groupRank) ; 
     111      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
     112 
     113      MPI_Win_get_group(windows_[1], &group) ; 
     114      MPI_Group_size(group, &groupSize) ; 
     115      MPI_Group_rank(group, &groupRank) ; 
     116      if (groupRank!=clientRank_) ERROR("CClientBuffer::CClientBuffer",<< " ClientRank != groupRank "<<clientRank_<<" "<<groupRank); 
     117 
     118      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     119      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     120 
     121      MPI_Win_unlock(clientRank_, windows_[1]) ; 
     122      MPI_Win_unlock(clientRank_, windows_[0]) ; 
     123    }  
     124 
     125  } 
     126 
    95127 
    96128  CClientBuffer::~CClientBuffer() 
     
    217249    int flag; 
    218250     
    219     MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
    220     MPI_Win_unlock(clientRank_, windows_[0]) ; 
    221  
    222     MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
    223     MPI_Win_unlock(clientRank_, windows_[1]) ; 
    224  
     251    if (hasWindows) 
     252    {  
     253      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[0]) ; 
     254      MPI_Win_unlock(clientRank_, windows_[0]) ; 
     255 
     256      MPI_Win_lock(MPI_LOCK_EXCLUSIVE, clientRank_, 0, windows_[1]) ; 
     257      MPI_Win_unlock(clientRank_, windows_[1]) ; 
     258    } 
     259     
    225260    if (pending) 
    226261    { 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/buffer_client.hpp

    r2258 r2259  
    1515      static size_t maxRequestSize; 
    1616 
    17       CClientBuffer(MPI_Comm intercomm, vector<MPI_Win>& windows, int clientRank, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 
     17      CClientBuffer(MPI_Comm intercomm, int serverRank, StdSize bufferSize, StdSize estimatedMaxEventSize); 
    1818      ~CClientBuffer(); 
    1919//      void createWindows(MPI_Comm oneSidedComm) ; 
     
    3333      void fixBufferSize(size_t bufferSize) { newBufferSize_=bufferSize ; isGrowableBuffer_=false ; resizingBufferStep_=1 ;} 
    3434      void fixBuffer(void) { isGrowableBuffer_=false ;} 
     35      void attachWindows(vector<MPI_Win>& windows) ; 
    3536    private: 
    3637       void resizeBuffer(size_t newSize) ; 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.cpp

    r2258 r2259  
    5050      computeLeader(clientRank, clientSize, serverSize, ranksServerLeader, ranksServerNotLeader); 
    5151 
    52       if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged) ; 
    53        
    54       if (!isAttachedModeEnabled()) 
    55       {   
    56  
    57         CTimer::get("create Windows").resume() ; 
    58  
    59         // We create dummy pair of intercommunicator between clients and server 
    60         // Why ? Just because on openMPI, it reduce the creation time of windows otherwhise which increase quadratically 
    61         // We don't know the reason 
    62         double time ; 
    63         MPI_Comm commSelf ; 
    64         MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf) ; 
    65         MPI_Comm interComm ; 
    66         winComm_.resize(serverSize) ; 
    67         windows_.resize(serverSize) ; 
    68         for(int rank=0; rank<serverSize; rank++)  
    69         { 
    70           time=MPI_Wtime() ; 
    71           MPI_Intercomm_create(commSelf, 0, interCommMerged, clientSize+rank, 0, &interComm) ; 
    72           MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
    73           windows_[rank].resize(2) ; 
    74           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
    75           MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
    76           time=MPI_Wtime()-time ; 
    77           info(100)<< "MPI_Win_create_dynamic : client to server rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ; 
    78         } 
    79         MPI_Comm_free(&commSelf) ; 
    80         CTimer::get("create Windows").resume() ; 
    81      } 
     52      if (flag) MPI_Intercomm_merge(interComm_,false, &interCommMerged_) ; 
     53       
     54      MPI_Comm_split(intraComm_,clientRank,clientRank, &commSelf_) ; // for windows 
    8255 
    8356      auto time=chrono::system_clock::now().time_since_epoch().count() ; 
     
    338311      } 
    339312       
    340       vector<MPI_Win> Wins(2,MPI_WIN_NULL) ; 
    341       if (!isAttachedModeEnabled()) Wins=windows_[rank] ; 
    342    
    343       CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, Wins, clientRank, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
     313      CClientBuffer* buffer = buffers[rank] = new CClientBuffer(interComm, rank, mapBufferSize_[rank], maxEventSizes[rank]); 
    344314      if (isGrowableBuffer_) buffer->setGrowableBuffer(1.2) ; 
    345315      else buffer->fixBuffer() ; 
     
    354324      bufOut->put(sendBuff, 4);  
    355325      buffer->checkBuffer(true); 
    356  
     326       
     327       // create windows dynamically for one-sided 
     328      if (!isAttachedModeEnabled()) 
     329      {  
     330        CTimer::get("create Windows").resume() ; 
     331        MPI_Comm interComm ; 
     332        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 
     333        MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
     334        MPI_Comm_free(&interComm) ; 
     335        windows_[rank].resize(2) ; 
     336        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     337        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);    
     338        CTimer::get("create Windows").suspend() ; 
     339      } 
     340      else 
     341      { 
     342        winComm_[rank] = MPI_COMM_NULL ; 
     343        windows_[rank].resize(2) ; 
     344        windows_[rank][0] = MPI_WIN_NULL ; 
     345        windows_[rank][1] = MPI_WIN_NULL ; 
     346      } 
     347      buffer->attachWindows(windows_[rank]) ; 
    357348   } 
    358349 
     
    384375      if (!isAttachedModeEnabled()) 
    385376      {   
    386         for(int rank=0; rank<serverSize; rank++) 
    387         { 
     377        for(auto& it : winComm_) 
     378        { 
     379          int rank = it.first ; 
    388380          MPI_Win_free(&windows_[rank][0]); 
    389381          MPI_Win_free(&windows_[rank][1]); 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_client.hpp

    r2258 r2259  
    9090      MPI_Comm interComm; //!< Communicator of server group (interCommunicator) 
    9191 
    92       MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
     92      MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
     93      MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged  
    9394 
    9495      MPI_Comm intraComm; //!< Communicator of client group 
     
    120121      std::list<int> ranksServerNotLeader; 
    121122 
    122       std::vector<MPI_Comm> winComm_ ; //! Window communicators 
    123       std::vector<std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 
     123      std::map<int, MPI_Comm> winComm_ ; //! Window communicators 
     124      std::map<int, std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers == windows[nbServers][2] 
    124125      bool isAttached_ ; 
    125126      CContextServer* associatedServer_ ; //!< The server associated to the pair client/server 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.cpp

    r2258 r2259  
    7373 
    7474 
    75     if (!isAttachedModeEnabled()) 
    76     { 
    77       CTimer::get("create Windows").resume() ; 
    78  
    79       MPI_Intercomm_merge(interComm_,true,&interCommMerged) ; 
    80  
    81       double time ; 
    82       windows_.resize(clientSize_) ; 
    83       MPI_Comm commSelf ; 
    84       MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf) ; 
    85       MPI_Comm interComm ; 
    86       winComm_.resize(clientSize_) ; 
    87       for(int rank=0; rank<clientSize_ ; rank++)  
    88       { 
    89         time=MPI_Wtime() ; 
    90         MPI_Intercomm_create(commSelf, 0, interCommMerged, rank, 0 , &interComm) ; 
    91         MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
    92         windows_[rank].resize(2) ; 
    93         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
    94         MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);   
    95         time=MPI_Wtime()-time ; 
    96         info(100)<< "MPI_Win_create_dynamic : server to client rank "<<rank<<" => "<<time/1e-6<<" us"<<endl ; 
    97       } 
    98       MPI_Comm_free(&commSelf) ; 
    99       CTimer::get("create Windows").suspend() ; 
    100     } 
    101     else  
    102     { 
    103       winComm_.resize(clientSize_) ; 
    104       windows_.resize(clientSize_) ; 
    105       for(int rank=0; rank<clientSize_ ; rank++)  
    106       { 
    107         winComm_[rank] = MPI_COMM_NULL ; 
    108         windows_[rank].resize(2) ; 
    109         windows_[rank][0]=MPI_WIN_NULL ; 
    110         windows_[rank][1]=MPI_WIN_NULL ; 
    111       } 
    112     } 
     75    if (!isAttachedModeEnabled()) MPI_Intercomm_merge(interComm_,true,&interCommMerged_) ; 
     76    MPI_Comm_split(intraComm_, intraCommRank, intraCommRank, &commSelf_) ; // for windows 
    11377     
    11478    itLastTimeLine=lastTimeLine.begin() ; 
     
    183147    if (it==buffers.end()) // Receive the buffer size and allocate the buffer 
    184148    { 
    185        MPI_Aint recvBuff[4] ; 
    186        MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status); 
    187        remoteHashId_ = recvBuff[0] ; 
    188        StdSize buffSize = recvBuff[1]; 
    189        vector<MPI_Aint> winAdress(2) ; 
    190        winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
    191        mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
    192        it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 
    193        lastTimeLine[rank]=0 ; 
    194        itLastTimeLine=lastTimeLine.begin() ; 
    195        return true; 
     149      MPI_Aint recvBuff[4] ; 
     150      MPI_Mrecv(recvBuff, 4, MPI_AINT,  &message, &status); 
     151      remoteHashId_ = recvBuff[0] ; 
     152      StdSize buffSize = recvBuff[1]; 
     153      vector<MPI_Aint> winAdress(2) ; 
     154      winAdress[0]=recvBuff[2] ; winAdress[1]=recvBuff[3] ; 
     155      mapBufferSize_.insert(std::make_pair(rank, buffSize)); 
     156 
     157      // create windows dynamically for one-sided 
     158      if (!isAttachedModeEnabled()) 
     159      {  
     160        CTimer::get("create Windows").resume() ; 
     161        MPI_Comm interComm ; 
     162        MPI_Intercomm_create(commSelf_, 0, interCommMerged_, rank, 0 , &interComm) ; 
     163        MPI_Intercomm_merge(interComm, true, &winComm_[rank]) ; 
     164        windows_[rank].resize(2) ; 
     165        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     166        MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]); 
     167        CTimer::get("create Windows").suspend() ; 
     168      } 
     169      else 
     170      { 
     171        winComm_[rank] = MPI_COMM_NULL ; 
     172        windows_[rank].resize(2) ; 
     173        windows_[rank][0] = MPI_WIN_NULL ; 
     174        windows_[rank][1] = MPI_WIN_NULL ; 
     175      }    
     176 
     177      it=(buffers.insert(pair<int,CServerBuffer*>(rank,new CServerBuffer(windows_[rank], winAdress, 0, buffSize)))).first; 
     178      lastTimeLine[rank]=0 ; 
     179      itLastTimeLine=lastTimeLine.begin() ; 
     180 
     181      return true; 
    196182    } 
    197183    else 
     
    425411    if (!isAttachedModeEnabled()) 
    426412    { 
    427       for(int rank=0; rank<clientSize_; rank++) 
    428       { 
     413      for(auto& it : winComm_) 
     414      { 
     415        int rank = it.first ; 
    429416        MPI_Win_free(&windows_[rank][0]); 
    430417        MPI_Win_free(&windows_[rank][1]); 
  • XIOS/dev/dev_ym/XIOS_COUPLING/src/context_server.hpp

    r2258 r2259  
    4444    int clientSize_ ; 
    4545 
    46     MPI_Comm interCommMerged; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
     46    MPI_Comm interCommMerged_; //!< Communicator of the client group + server group (intraCommunicator) needed for one sided communication. 
     47    MPI_Comm commSelf_ ; //!< Communicator for proc alone from interCommMerged  
    4748 
    4849    map<int,CServerBuffer*> buffers ; 
     
    7172 
    7273    private: 
    73  
     74   
    7475      std::map<int, StdSize> mapBufferSize_; 
    75       std::vector<MPI_Comm>winComm_ ; //! Window communicators 
    76       std::vector<std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 
     76      std::map<int,MPI_Comm> winComm_ ; //! Window communicators 
     77      std::map<int,std::vector<MPI_Win> >windows_ ; //! one sided mpi windows to expose client buffers to servers ; No memory will be attached on server side. 
    7778      CEventScheduler* eventScheduler_ ; 
    7879      bool isProcessingEvent_ ; 
Note: See TracChangeset for help on using the changeset viewer.