Changeset 2455


Ignore:
Timestamp:
01/09/23 18:41:19 (17 months ago)
Author:
ymipsl
Message:

Make coupling working again.
YM

Location:
XIOS3/dev/XIOS_FILE_SERVICES/src
Files:
2 added
9 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/dev/XIOS_FILE_SERVICES/src/buffer_client.cpp

    r2324 r2455  
    9494  void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 
    9595  { 
     96    isAttachedWindows_=true ; 
    9697    windows_=windows ; 
    9798    if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 
     
    170171  bool CClientBuffer::isBufferFree(StdSize size) 
    171172  { 
    172    
     173    if (!isAttachedWindows_) return false; 
     174 
    173175    lockBuffer(); 
    174176    count=*bufferCount[current] ; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/buffer_client.hpp

    r2260 r2455  
    3434      void fixBuffer(void) { isGrowableBuffer_=false ;} 
    3535      void attachWindows(vector<MPI_Win>& windows) ; 
     36      bool isAttachedWindows(void) { return isAttachedWindows_ ;} 
    3637    private: 
    3738       void resizeBuffer(size_t newSize) ; 
     
    6970      const MPI_Comm interComm; 
    7071      std::vector<MPI_Win> windows_ ; 
    71       bool hasWindows ; 
    72  
     72      bool hasWindows=false ; 
     73      bool isAttachedWindows_=false ; 
    7374      double latency_=0 ; 
    7475      double lastCheckedWithNothing_=0 ; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/manager/ressources_manager.cpp

    r2454 r2455  
    22#include "server.hpp" 
    33#include "servers_ressource.hpp" 
     4#include "token_manager.hpp" 
    45#include "timer.hpp" 
    56 
     
    2122    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ;  
    2223    else commRank=0 ; 
     24    tokenManager_ = new CTokenManager(xiosComm_,commRank) ; 
     25 
    2326    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ; 
    2427 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/manager/ressources_manager.hpp

    r2453 r2455  
    1111#include "window_manager.hpp" 
    1212#include "pool_ressource.hpp" 
     13#include "token_manager.hpp" 
    1314 
    1415 
     
    5354    void waitPoolRegistration(const string& poolId) ; 
    5455     
     56 
    5557    void registerServerLeader(int leaderRank) ; 
    5658    void registerRessourcesSize(int size) ; 
    5759    void registerPoolClient(const std::string& poolId,int size,int leader) ; 
    5860    void registerPoolServer(const std::string& poolId,int size,int leader) ; 
     61    CTokenManager* getTokenManager(void) {return tokenManager_ ;}  
    5962 
    6063    int managerGlobalLeader_ ; 
     
    6366 
    6467    CWindowManager* winNotify_ ; 
     68    CTokenManager* tokenManager_ ; 
    6569 
    6670    const size_t maxBufferSize_=1024*1024 ; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.cpp

    r2453 r2455  
    738738      for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 
    739739      for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 
    740       for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 
    741       for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 
     740      //for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 
     741      //for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 
     742      for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(); 
     743      for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(); 
    742744    } 
    743745    setCurrent(getId()) ; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/transport/legacy_context_client.cpp

    r2343 r2455  
    1313#include "server.hpp" 
    1414#include "services.hpp" 
     15#include "ressources_manager.hpp" 
    1516#include <boost/functional/hash.hpp> 
    1617#include <random> 
     
    110111      } 
    111112       
     113      MPI_Request req ; 
     114      MPI_Status status ; 
     115      MPI_Ibarrier(intraComm,&req) ; 
     116      int flag ; 
     117      MPI_Test(&req,&flag,&status) ; 
     118      while(!flag)  
     119      { 
     120        callGlobalEventLoop() ; 
     121        MPI_Test(&req,&flag,&status) ; 
     122      } 
     123 
     124 
    112125      timeLine++; 
    113126    } 
     
    126139     * \return whether the already allocated buffers could be used 
    127140    */ 
    128     bool CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
    129                                     bool nonBlocking /*= false*/) 
     141    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers) 
    130142    { 
    131143      list<int>::const_iterator itServer, itSize; 
     
    134146      list<CClientBuffer*>::iterator itBuffer; 
    135147      bool areBuffersFree; 
    136  
     148      
    137149      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 
    138150      { 
     
    140152        if (it == buffers.end()) 
    141153        { 
     154          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ; 
     155          size_t token = tokenManager->getToken() ; 
     156          while (!tokenManager->lockToken(token)) callGlobalEventLoop() ; 
    142157          newBuffer(*itServer); 
    143158          it = buffers.find(*itServer); 
     159          checkAttachWindows(it->second,it->first) ; 
     160          tokenManager->unlockToken(token) ; 
    144161        } 
    145162        bufferList.push_back(it->second); 
     
    177194        } 
    178195 
    179       } while (!areBuffersFree && !nonBlocking); 
     196      } while (!areBuffersFree); 
    180197      CTimer::get("Blocking time").suspend(); 
    181198 
    182       if (areBuffersFree) 
    183       { 
    184         for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    185           retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
    186       } 
    187       return areBuffersFree; 
     199      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     200        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
    188201   } 
    189202 
     
    225238      bufOut->put(sendBuff, 4);  
    226239      buffer->checkBuffer(true); 
    227        
     240/* 
    228241       // create windows dynamically for one-sided 
    229242      if (!isAttachedModeEnabled()) 
     
    254267      buffer->attachWindows(windows_[rank]) ; 
    255268      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ; 
    256         
    257    } 
    258  
     269  */      
     270   } 
     271 
     272   void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank) 
     273   { 
     274      if (!buffer->isAttachedWindows()) 
     275      { 
     276           // create windows dynamically for one-sided 
     277        if (!isAttachedModeEnabled()) 
     278        {  
     279          CTimer::get("create Windows").resume() ; 
     280          MPI_Comm interComm ; 
     281          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 
     282          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
     283          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
     284          MPI_Comm_free(&interComm) ; 
     285          windows_[rank].resize(2) ; 
     286       
     287          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     288          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
     289       
     290          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);    
     291          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
     292 
     293          CTimer::get("create Windows").suspend() ; 
     294          buffer->attachWindows(windows_[rank]) ; 
     295          MPI_Barrier(winComm_[rank]) ; 
     296        } 
     297        else 
     298        { 
     299          winComm_[rank] = MPI_COMM_NULL ; 
     300          windows_[rank].resize(2) ; 
     301          windows_[rank][0] = MPI_WIN_NULL ; 
     302          windows_[rank][1] = MPI_WIN_NULL ; 
     303          buffer->attachWindows(windows_[rank]) ; 
     304        } 
     305 
     306      } 
     307    } 
     308 
     309 
     310   
    259311   /*! 
    260312   Verify state of buffers. Buffer is under pending state if there is no message on it 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/transport/legacy_context_client.hpp

    r2343 r2455  
    4545 
    4646      // Functions to set/get buffers 
    47       bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
     47      void getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers); 
    4848      void newBuffer(int rank); 
     49      void checkAttachWindows(CClientBuffer* buffer , int rank) ; 
    4950      bool checkBuffers(list<int>& ranks); 
    5051      bool checkBuffers(void); 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/transport/legacy_context_server.cpp

    r2433 r2455  
    9595 
    9696    traceOff(); 
    97     MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 
     97    MPI_Improbe(MPI_ANY_SOURCE, 20, interComm,&flag,&message, &status); 
    9898    traceOn(); 
    9999    if (flag==true) listenPendingRequest(message, status) ; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/transport/one_sided_context_client.hpp

    r2343 r2455  
    4141 
    4242      // Functions to set/get buffers 
    43       bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
     43//      bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
    4444      void newBuffer(int rank); 
    4545      bool checkBuffers(list<int>& ranks); 
Note: See TracChangeset for help on using the changeset viewer.