Changeset 2458


Ignore:
Timestamp:
01/25/23 16:59:46 (16 months ago)
Author:
ymipsl
Message:

Merge XIOS_FILE_SERVICE dev branch into trunk

YM

Location:
XIOS3/trunk
Files:
35 edited
5 copied

Legend:

Unmodified
Added
Removed
  • XIOS3/trunk

  • XIOS3/trunk/src/buffer_client.cpp

    r2324 r2458  
    9494  void CClientBuffer::attachWindows(vector<MPI_Win>& windows) 
    9595  { 
     96    isAttachedWindows_=true ; 
    9697    windows_=windows ; 
    9798    if (windows_[0]==MPI_WIN_NULL && windows_[1]==MPI_WIN_NULL) hasWindows=false ; 
     
    170171  bool CClientBuffer::isBufferFree(StdSize size) 
    171172  { 
    172    
     173    if (!isAttachedWindows_) return false; 
     174 
    173175    lockBuffer(); 
    174176    count=*bufferCount[current] ; 
  • XIOS3/trunk/src/buffer_client.hpp

    r2260 r2458  
    3434      void fixBuffer(void) { isGrowableBuffer_=false ;} 
    3535      void attachWindows(vector<MPI_Win>& windows) ; 
     36      bool isAttachedWindows(void) { return isAttachedWindows_ ;} 
    3637    private: 
    3738       void resizeBuffer(size_t newSize) ; 
     
    6970      const MPI_Comm interComm; 
    7071      std::vector<MPI_Win> windows_ ; 
    71       bool hasWindows ; 
    72  
     72      bool hasWindows=false ; 
     73      bool isAttachedWindows_=false ; 
    7374      double latency_=0 ; 
    7475      double lastCheckedWithNothing_=0 ; 
  • XIOS3/trunk/src/client.cpp

    r2418 r2458  
    188188      
    189189      CXios::launchDaemonsManager(false) ; 
    190       poolRessource_ = new CPoolRessource(clientComm, codeId) ; 
     190      poolRessource_ = new CPoolRessource(clientComm, codeId, false) ; 
    191191 
    192192      ///////////////////////////////////////// 
    193193      ///////////// PART 4 //////////////////// 
    194194      /////////////////////////////////////////       
    195        
     195/* 
     196      MPI_Request req ; 
     197      MPI_Status status ; 
     198      MPI_Ibarrier(xiosGlobalComm,&req) ; // be sure that all services are created now, could be remove later if more asynchronisity 
     199      int ok=false ; 
     200      while (!ok) 
     201      { 
     202        CXios::getDaemonsManager()->eventLoop() ; 
     203        MPI_Test(&req,&ok,&status) ; 
     204      } 
     205*/       
    196206      returnComm = clientComm ; 
    197207    } 
  • XIOS3/trunk/src/config/context_attribute.conf

    r1761 r2458  
    11DECLARE_ATTRIBUTE(StdString, output_dir) 
    22DECLARE_ATTRIBUTE(bool, attached_mode) 
     3DECLARE_ATTRIBUTE(StdString, default_pool) 
     4DECLARE_ATTRIBUTE(StdString, default_pool_writer) 
     5DECLARE_ATTRIBUTE(StdString, default_pool_reader) 
     6DECLARE_ATTRIBUTE(StdString, default_pool_gatherer) 
     7DECLARE_ATTRIBUTE(StdString, default_writer) 
     8DECLARE_ATTRIBUTE(StdString, default_gatherer) 
     9DECLARE_ATTRIBUTE(StdString, default_reader) 
     10DECLARE_ATTRIBUTE(bool, default_using_server2) 
  • XIOS3/trunk/src/config/file_attribute.conf

    r1493 r2458  
    3838DECLARE_ATTRIBUTE(StdString, uuid_name) 
    3939DECLARE_ATTRIBUTE(StdString, uuid_format) 
     40 
     41DECLARE_ATTRIBUTE(StdString, pool_writer) 
     42DECLARE_ATTRIBUTE(StdString, writer) 
     43DECLARE_ATTRIBUTE(StdString, pool_gatherer) 
     44DECLARE_ATTRIBUTE(StdString, gatherer) 
     45DECLARE_ATTRIBUTE(StdString, pool_reader) 
     46DECLARE_ATTRIBUTE(StdString, reader) 
     47DECLARE_ATTRIBUTE(bool, using_server2) 
  • XIOS3/trunk/src/config/node_type.conf

    r2408 r2458  
    128128#endif //__XIOS_CPoolNode__ 
    129129 
     130#ifdef __XIOS_CServiceNode__ 
     131   DECLARE_NODE(ServiceNode, node) 
     132#endif //__XIOS_CServiceNode__ 
     133 
    130134#ifdef __XIOS_CContext__ 
    131135   DECLARE_NODE_PAR(Context, context) 
  • XIOS3/trunk/src/config/pool_attribute.conf

    r2408 r2458  
    11DECLARE_ATTRIBUTE(StdString, name) 
     2DECLARE_ATTRIBUTE(int, nprocs) 
     3DECLARE_ATTRIBUTE(double, global_fraction) 
     4DECLARE_ATTRIBUTE(double, remain_fraction) 
     5DECLARE_ATTRIBUTE(bool, remain) 
  • XIOS3/trunk/src/group_factory_decl.cpp

    r2408 r2458  
    4949  macro(CExtractDomainGroup) 
    5050  macro(CPoolNodeGroup) 
     51  macro(CServiceNodeGroup) 
    5152 
    5253} 
  • XIOS3/trunk/src/group_template_decl.cpp

    r2408 r2458  
    3838  macro(ExtractDomain) 
    3939  macro(PoolNode) 
     40  macro(ServiceNode) 
    4041 
    4142} 
  • XIOS3/trunk/src/manager/pool_ressource.cpp

    r2404 r2458  
    1010namespace xios 
    1111{ 
    12   CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id) : Id_(Id), finalizeSignal_(false) 
     12  CPoolRessource::CPoolRessource(MPI_Comm poolComm, const std::string& Id, bool isServer) : Id_(Id), finalizeSignal_(false) 
    1313  { 
    1414    int commRank, commSize ; 
     
    2323      int globalLeaderRank ; 
    2424      MPI_Comm_rank(CXios::getXiosComm(),&globalLeaderRank) ; 
    25       CXios::getRessourcesManager()->registerPool(Id, commSize, globalLeaderRank) ; 
     25      if (isServer) CXios::getRessourcesManager()->registerPoolServer(Id, commSize, globalLeaderRank) ; 
    2626    } 
    2727     
  • XIOS3/trunk/src/manager/pool_ressource.hpp

    r2404 r2458  
    2727 
    2828    public: 
    29     CPoolRessource(MPI_Comm poolComm, const std::string& Id) ; 
     29    CPoolRessource(MPI_Comm poolComm, const std::string& Id, bool isServer) ; 
    3030    ~CPoolRessource() ; 
    3131     
  • XIOS3/trunk/src/manager/ressources_manager.cpp

    r2260 r2458  
    22#include "server.hpp" 
    33#include "servers_ressource.hpp" 
     4#include "token_manager.hpp" 
    45#include "timer.hpp" 
    56 
     
    2122    if (commRank==0 && isXiosServer) MPI_Comm_rank(xiosComm_, &commRank) ;  
    2223    else commRank=0 ; 
     24    tokenManager_ = new CTokenManager(xiosComm_,commRank) ; 
     25 
    2326    MPI_Allreduce(&commRank, &managerGlobalLeader_, 1, MPI_INT, MPI_SUM, xiosComm_) ; 
    2427 
     
    163166    buffer.realloc(maxBufferSize_) ; 
    164167     
    165     buffer<<serverLeader_ ;   
     168    buffer<<ressourcesSize_<<freeRessourcesSize_<<serverLeader_ ;   
    166169    buffer<<(int) pools_.size(); 
    167170    for(auto it=pools_.begin();it!=pools_.end(); ++it) 
     
    169172      auto key = it->first ; 
    170173      auto val = it->second ;  
    171       buffer << key<<std::get<0>(val) << std::get<1>(val)  ; 
     174      buffer << key<<std::get<0>(val)  << std::get<1>(val)  << std::get<2>(val); 
    172175    } 
    173176  } 
     
    177180    std::string poolId ; 
    178181    int size ; 
     182    int freeSize ; 
    179183    int leader ; 
    180184    
    181     buffer>>serverLeader_ ; 
     185    buffer>>ressourcesSize_>>freeRessourcesSize_>>serverLeader_ ; 
    182186    pools_.clear() ; 
    183187    int nbPools ; 
     
    185189    for(int i=0;i<nbPools;i++)  
    186190    { 
    187       buffer>>poolId>>size>>leader ; 
    188       pools_[poolId]=std::make_tuple(size,leader) ; 
     191      buffer>>poolId>>size>>freeSize>>leader ; 
     192      pools_[poolId]=std::make_tuple(size, freeSize, leader) ; 
    189193    } 
    190194  } 
     
    210214 
    211215  
    212   void CRessourcesManager::registerPool(const string& poolId, int size, int leader) 
    213   { 
    214     winRessources_->lockWindow(managerGlobalLeader_,0) ; 
    215     winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 
    216     pools_[poolId] = make_tuple(size,leader) ; 
     216  void CRessourcesManager::registerPoolClient(const string& poolId, int size, int leader) 
     217  { 
     218    winRessources_->lockWindow(managerGlobalLeader_,0) ; 
     219    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 
     220    pools_[poolId] = make_tuple(size, size, leader) ; 
     221    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ; 
     222    winRessources_->unlockWindow(managerGlobalLeader_,0) ;     
     223  } 
     224 
     225  void CRessourcesManager::registerPoolServer(const string& poolId, int size, int leader) 
     226  { 
     227    winRessources_->lockWindow(managerGlobalLeader_,0) ; 
     228    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 
     229    pools_[poolId] = make_tuple(size, size, leader) ; 
    217230    freeRessourcesSize_-=size ; 
    218231    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ; 
     
    220233  } 
    221234 
    222  
    223   bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& leader) 
     235  bool CRessourcesManager::getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader) 
    224236  { 
    225237    winRessources_->lockWindow(managerGlobalLeader_,0) ; 
     
    232244    { 
    233245      size=get<0>(it->second) ; 
    234       leader=get<1>(it->second) ; 
     246      freeSize=get<1>(it->second) ; 
     247      leader=get<2>(it->second) ; 
    235248      return true ; 
    236249    } 
     250  } 
     251 
     252  bool CRessourcesManager::decreasePoolFreeSize(const string& poolId, int size) 
     253  { 
     254    bool ret ; 
     255 
     256    winRessources_->lockWindow(managerGlobalLeader_,0) ; 
     257    winRessources_->updateFromWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpIn) ; 
     258    
     259 
     260    auto it=pools_.find(poolId) ; 
     261     
     262    if ( it == pools_.end()) ret=false ; 
     263    else  
     264    { 
     265      get<1>(it->second)-=size ; 
     266      ret=true ; 
     267    } 
     268    winRessources_->updateToWindow(managerGlobalLeader_, this, &CRessourcesManager::ressourcesDumpOut) ; 
     269    winRessources_->unlockWindow(managerGlobalLeader_,0) ;  
     270 
     271    return ret ;    
    237272  } 
    238273 
     
    257292  bool CRessourcesManager::getPoolLeader(const string& poolId, int& leader) 
    258293  { 
    259     int size ; 
    260     return getPoolInfo(poolId, size, leader) ; 
     294    int size, freeSize ; 
     295    return getPoolInfo(poolId, size, freeSize, leader) ; 
    261296  } 
    262297 
    263298  bool CRessourcesManager::getPoolSize(const string& poolId, int& size) 
    264299  { 
    265     int leader ; 
    266     return getPoolInfo(poolId, size, leader) ; 
     300    int leader,freeSize ; 
     301    return getPoolInfo(poolId, size, freeSize, leader) ; 
     302  } 
     303 
     304  bool CRessourcesManager::getPoolFreeSize(const string& poolId, int& freeSize) 
     305  { 
     306    int leader,size ; 
     307    return getPoolInfo(poolId, size, freeSize, leader) ; 
    267308  } 
    268309 
    269310  bool CRessourcesManager::hasPool(const string& poolId) 
    270311  { 
    271     int leader,size ; 
    272     return getPoolInfo(poolId, size, leader) ; 
     312    int leader,size,freeSize ; 
     313    return getPoolInfo(poolId, size, freeSize, leader) ; 
     314  } 
     315 
     316  void CRessourcesManager::waitPoolRegistration(const string& poolId) 
     317  { 
     318    while(!hasPool(poolId)) CXios::getDaemonsManager()->servicesEventLoop() ; 
    273319  } 
    274320} 
  • XIOS3/trunk/src/manager/ressources_manager.hpp

    r2260 r2458  
    1111#include "window_manager.hpp" 
    1212#include "pool_ressource.hpp" 
     13#include "token_manager.hpp" 
    1314 
    1415 
     
    4546    int  getRessourcesSize(void) ; 
    4647    int  getFreeRessourcesSize(void) ; 
    47     bool getPoolInfo(const string& poolId, int& size, int& leader) ; 
     48    bool getPoolInfo(const string& poolId, int& size, int& freeSize, int& leader) ; 
    4849    bool getPoolLeader(const string& poolId, int& leader) ; 
    4950    bool getPoolSize(const string& poolId, int& size) ; 
     51    bool getPoolFreeSize(const string& poolId, int& freeSize) ; 
    5052    bool hasPool(const string& poolId) ; 
     53    bool decreasePoolFreeSize(const string& poolId, int size) ; 
     54    void waitPoolRegistration(const string& poolId) ; 
     55     
    5156 
    5257    void registerServerLeader(int leaderRank) ; 
    5358    void registerRessourcesSize(int size) ; 
    54     void registerPool(const std::string& poolId,int size,int leader) ; 
     59    void registerPoolClient(const std::string& poolId,int size,int leader) ; 
     60    void registerPoolServer(const std::string& poolId,int size,int leader) ; 
     61    CTokenManager* getTokenManager(void) {return tokenManager_ ;}  
    5562 
    5663    int managerGlobalLeader_ ; 
     
    5966 
    6067    CWindowManager* winNotify_ ; 
     68    CTokenManager* tokenManager_ ; 
    6169 
    6270    const size_t maxBufferSize_=1024*1024 ; 
     
    6775    tuple<std::string, int> notifyCreatePool_ ; 
    6876 
    69     std::map<std::string, std::tuple<int,int>> pools_ ; 
     77    std::map<std::string, std::tuple<int,int,int>> pools_ ; 
    7078    int serverLeader_ ; 
    7179    int ressourcesSize_ ; 
  • XIOS3/trunk/src/manager/servers_ressource.cpp

    r2274 r2458  
    5050    bool isPartOf ; 
    5151 
    52     for(int i=0; i<freeRessourcesRank_.size();i++)  
     52    for(int i=0, j=0; i<freeRessourcesRank_.size();i++)  
    5353    { 
    5454       if (i<size) isPartOf=true ; 
     
    5656       { 
    5757         isPartOf=false ; 
    58          newFreeRessourcesRank[i]=freeRessourcesRank_[i] ; 
     58         newFreeRessourcesRank[j]=freeRessourcesRank_[i] ; 
     59         j++ ; 
    5960       } 
    6061        
     
    165166    if (isPartOf) 
    166167    {   
    167       poolRessource_ = new CPoolRessource(poolComm, poolId) ; 
     168      poolRessource_ = new CPoolRessource(poolComm, poolId, true) ; 
    168169      MPI_Comm_free(&poolComm) ; 
    169170    } 
  • XIOS3/trunk/src/manager/services_manager.cpp

    r2404 r2458  
    5454 
    5555    int leader ; 
    56     int poolSize ; 
     56    int poolSize, poolFreeSize ; 
    5757     
    5858    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 
    59     bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
     59    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 
    6060    if (wait) 
    6161    { 
     
    6363      { 
    6464        CXios::getDaemonsManager()->eventLoop() ; 
    65         ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
     65        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 
    6666      } 
    6767    } 
     
    7070    { 
    7171      info(40)<<"CServicesManager : create service notification to leader "<<leader<<", serviceId : "<<serviceId<<", size : "<<size<<endl ; 
     72      CXios::getRessourcesManager()->decreasePoolFreeSize(poolId ,size) ; 
    7273      createServicesNotify(leader, serviceId, type, size, nbPartitions) ; 
    7374      return true ; 
     
    8182    int leader ; 
    8283    int poolSize ; 
     84    int poolFreeSize ; 
    8385     
    8486    info(40)<<"CServicesManager : waiting for pool info : "<<poolId<<endl ; ; 
    85     bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
     87    bool ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 
    8688    if (wait) 
    8789    { 
     
    8991      { 
    9092        CXios::getDaemonsManager()->eventLoop() ; 
    91         ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, leader) ; 
     93        ok=CXios::getRessourcesManager()->getPoolInfo(poolId, poolSize, poolFreeSize, leader) ; 
    9294      } 
    9395    } 
     
    251253 
    252254  bool CServicesManager::getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type,  
    253                                         int& size, int& nbPartitions, int& leader) 
     255                                        int& size, int& nbPartitions, int& leader, bool wait) 
    254256  { 
    255257     
     
    259261 
    260262    auto it=services_.find(std::tuple<std::string,std::string,int>(poolId,serviceId,partitionId)) ; 
    261     if ( it == services_.end()) return false ; 
     263    if ( it == services_.end() && !wait) return false ; 
    262264    else 
    263265    { 
     266      if (wait) waitServiceRegistration(poolId, serviceId, partitionId) ; 
    264267      type= std::get<0>(it->second);  
    265268      size= std::get<1>(it->second);  
     
    270273  } 
    271274 
    272   bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader) 
     275  bool CServicesManager::getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader, bool wait) 
    273276  { 
    274277    int type; 
     
    278281  } 
    279282 
    280   bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type) 
     283  bool CServicesManager::getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, bool wait) 
    281284  { 
    282285    int size ; 
     
    286289  } 
    287290 
    288   bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions) 
     291  bool CServicesManager::getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartitions, bool wait) 
    289292  { 
    290293    int size ; 
     
    303306    else return true ; 
    304307  } 
     308  
     309  void CServicesManager::waitServiceRegistration(const std::string& poolId, const std::string& serviceId, const int& partitionId) 
     310  { 
     311    while(!hasService(poolId,serviceId,partitionId)) CXios::getDaemonsManager()->servicesEventLoop() ; 
     312  } 
    305313 
    306314} 
  • XIOS3/trunk/src/manager/services_manager.hpp

    r2404 r2458  
    4343     
    4444    void registerService(const std::string& poolId, const std::string& serviceId, const int& partitionId, int type, int size, int nbPartitions, int leader) ; 
    45     bool getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, int& size, int& nbPartition, int& leader) ; 
    46     bool getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader) ; 
    47     bool getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type) ; 
    48     bool getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartition) ; 
     45    bool getServiceInfo(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, int& size, int& nbPartition, int& leader, bool wait=false) ; 
     46    bool getServiceLeader(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& leader, bool wait=false) ; 
     47    bool getServiceType(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& type, bool wait=false) ; 
     48    bool getServiceNbPartitions(const std::string& poolId, const std::string& serviceId, const int& partitionId, int& nbPartition, bool wait=false) ; 
    4949    bool hasService(const std::string& poolId, const std::string& serviceId, const int& partitionId) ; 
     50    void waitServiceRegistration(const std::string& poolId, const std::string& serviceId, const int& partitionId); 
    5051    void servicesDumpOut(CBufferOut& buffer) ; 
    5152    void servicesDumpIn(CBufferIn& buffer) ; 
     53    int  getRessourcesSize(const std::string& poolId) ; 
     54    int  getFreeRessourcesSize(const std::string& poolId) ; 
    5255 
    5356    private: 
  • XIOS3/trunk/src/node/context.cpp

    r2441 r2458  
    577577    if (commRank==0) 
    578578    { 
    579       CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions) ; 
     579      CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 
    580580      for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 
    581581    } 
     
    588588      parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
    589589      int type ;  
    590       if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type) ; 
     590      if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 
    591591      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
    592592      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 
     
    610610  CATCH_DUMP_ATTR 
    611611   
     612   
     613  // obsolete 
    612614  void CContext::createServerInterComm(void)  
    613615  TRY 
     
    645647  } 
    646648  CATCH_DUMP_ATTR 
     649 
     650   
     651  void CContext::getServerInterComm(const string& poolId, const string& serviceId,  vector<pair<CContextClient*,CContextServer*>>& clientServers) 
     652  { 
     653    vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 
     654 
     655    auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 
     656    if (it!=serversMap_.end()) clientServers=it->second ; 
     657    else 
     658    { 
     659      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 
     660      else createServerInterComm(poolId, serviceId, retClientServers) ; 
     661      for(auto& retClientServer : retClientServers)  clientServers.push_back(retClientServer.second) ; 
     662       
     663      int serviceType ; 
     664      if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 
     665      MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 
     666       
     667      for(auto& clientServer : clientServers)      
     668      { 
     669        if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first)      ; writerServerOut_.push_back(clientServer.second) ; } 
     670        else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 
     671        else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 
     672      } 
     673      serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 
     674    } 
     675 
     676  } 
     677 
     678  vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 
     679  { 
     680    vector<pair<CContextClient*,CContextServer*>> clientServers ; 
     681    getServerInterComm(poolId, serviceId, clientServers ) ; 
     682    vector<CContextClient*> ret ; 
     683    for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 
     684    return ret ; 
     685  } 
     686 
    647687 
    648688  void CContext::globalEventLoop(void) 
     
    698738      for(auto couplerOut : couplerOutClient_) couplerOut.second->eventLoop(); 
    699739      for(auto couplerIn : couplerInClient_) couplerIn.second->eventLoop(); 
    700       for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 
    701       for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 
     740      //for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(enableEventsProcessing); 
     741      //for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(enableEventsProcessing); 
     742      for(auto couplerOut : couplerOutServer_) couplerOut.second->eventLoop(); 
     743      for(auto couplerIn : couplerInServer_) couplerIn.second->eventLoop(); 
    702744    } 
    703745    setCurrent(getId()) ; 
     
    783825        CContextServer* server ; 
    784826 
     827        /* 
    785828        if (writerClientOut_.size()!=0) 
    786829        { 
     
    803846          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
    804847        } 
     848        */ 
     849 
     850        for(int n=0; n<writerClientOut_.size() ; n++) 
     851        { 
     852          client=writerClientOut_[n] ; 
     853          server=writerServerOut_[n] ; 
     854 
     855          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 
     856          client->finalize(); 
     857          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 
     858          bool bufferReleased; 
     859          do 
     860          { 
     861            client->eventLoop(); 
     862            bufferReleased = !client->havePendingRequests(); 
     863          } while (!bufferReleased); 
     864          info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 
     865 
     866          bool notifiedFinalized=false ; 
     867          do 
     868          { 
     869            notifiedFinalized=client->isNotifiedFinalized() ; 
     870          } while (!notifiedFinalized) ; 
     871          server->releaseBuffers(); 
     872          client->releaseBuffers(); 
     873          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
     874        } 
     875         
    805876 
    806877        if (readerClientOut_.size()!=0) 
     
    879950 
    880951    
     952   void CContext::setDefaultServices(void) 
     953   { 
     954     defaultPoolWriterId_ = CXios::defaultPoolId ; 
     955     defaultPoolReaderId_ = CXios::defaultPoolId ; 
     956     defaultPoolGathererId_ = CXios::defaultPoolId ; 
     957     defaultWriterId_ = CXios::defaultWriterId ; 
     958     defaultReaderId_ = CXios::defaultReaderId ; 
     959     defaultGathererId_ = CXios::defaultGathererId ; 
     960     defaultUsingServer2_ = CXios::usingServer2 ; 
     961      
     962     if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
     963     if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
     964     if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
     965     if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
     966     if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
     967     if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
     968     if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
     969     if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     970   } 
     971 
    881972   /*! 
    882973   \brief Close all the context defintion and do processing data 
     
    894985 
    895986     CTimer::get("Context : close definition").resume() ; 
    896       
     987           
    897988     // create intercommunicator with servers.  
    898989     // not sure it is the good place to be called here  
    899      createServerInterComm() ; 
     990     //createServerInterComm() ; 
    900991 
    901992 
     
    9161007    // pour chacun des contextes. 
    9171008    solveDescInheritance(true); 
    918   
     1009    setDefaultServices() ; 
    9191010    // Check if some axis, domains or grids are eligible to for compressed indexed output. 
    9201011    // Warning: This must be done after solving the inheritance and before the rest of post-processing 
     
    10231114 
    10241115    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 
    1025     if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 
    1026     //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 
     1116     
     1117    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 
     1118    /* 
     1119    if (serviceType_==CServicesManager::CLIENT ) 
     1120    {    
     1121      if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 
     1122      else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1123    } 
     1124    if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1125    */ 
    10271126 
    10281127    // client side, assign context for file reading 
    1029     if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
    1030      
     1128//    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
     1129    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles)  
     1130    { 
     1131      string poolReaderId ; 
     1132      string readerId ; 
     1133      file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 
     1134      file->setContextClient(poolReaderId, readerId, 0) ; 
     1135    } 
     1136 
    10311137    // server side, assign context where to send file data read 
    10321138    if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; 
     
    11631269    // fix size for each context client 
    11641270    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 
     1271 
    11651272 
    11661273     CTimer::get("Context : close definition").suspend() ; 
     
    14361543   TRY 
    14371544   { 
     1545     map< pair<string,string>, vector<CFile*>> fileMaps ; 
     1546     for(auto& file : files) 
     1547     { 
     1548       string poolWriterId ; 
     1549       string poolGathererId ; 
     1550       string writerId  ; 
     1551       string gathererId  ; 
     1552       bool usingServer2 ; 
     1553 
     1554       file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 
     1555                                 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 
     1556       if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 
     1557       else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 
     1558     } 
     1559     for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 
     1560   } 
     1561   CATCH_DUMP_ATTR 
     1562 
     1563 
     1564   void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1565   TRY 
     1566   { 
    14381567     bool distFileMemory=false ; 
    14391568     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 
    14401569 
    1441      int nbPools = writerClientOut_.size(); 
    1442      if (nbPools==1) distributeFileOverOne(files) ; 
    1443      else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 
    1444      else distributeFileOverBandwith(files) ; 
    1445    } 
    1446    CATCH_DUMP_ATTR 
    1447  
    1448    void CContext::distributeFileOverOne(const vector<CFile*>& files) 
    1449    TRY 
    1450    { 
    1451     for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 
    1452    } 
    1453    CATCH_DUMP_ATTR 
    1454  
    1455    void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 
     1570     auto writers = getContextClient(poolId, serviceId) ; 
     1571     int  nbPools = writers.size() ; 
     1572      
     1573     if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 
     1574     else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 
     1575     else distributeFileOverBandwith(files, poolId, serviceId) ; 
     1576   } 
     1577   CATCH_DUMP_ATTR 
     1578 
     1579   void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1580   TRY 
     1581   { 
     1582     for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 
     1583   } 
     1584   CATCH_DUMP_ATTR 
     1585 
     1586   void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
    14561587   TRY 
    14571588   { 
     
    14591590      
    14601591     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 
    1461      int nbPools = writerClientOut_.size(); 
     1592     auto writers = getContextClient(poolId, serviceId) ; 
     1593     int nbPools = writers.size(); 
     1594     //int nbPools = writerClientOut_.size(); 
    14621595 
    14631596     // (1) Find all enabled files in write mode 
     
    15121645       dataSize=(*poolDataSize.begin()).first ; 
    15131646       j=(*poolDataSize.begin()).second ; 
    1514        dataSizeMap[i].second->setContextClient(writerClientOut_[j]); 
     1647       dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 
    15151648       dataSize+=dataSizeMap[i].first; 
    15161649       poolDataSize.erase(poolDataSize.begin()) ; 
     
    15221655   CATCH_DUMP_ATTR 
    15231656 
    1524    void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 
    1525    TRY 
    1526    { 
    1527      int nbPools = writerClientOut_.size(); 
     1657   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 
     1658   TRY 
     1659   { 
     1660     auto writers = getContextClient(poolId, serviceId) ; 
     1661     int nbPools = writers.size(); 
     1662     
    15281663     double ratio=0.5 ; 
    15291664     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 
     
    15911726         } 
    15921727       } 
    1593        filesList[i]->setContextClient(writerClientOut_[files[i].assignedServer_]) ; 
     1728       filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 
    15941729       delete [] files[i].assignedGrid_ ; 
    15951730     } 
  • XIOS3/trunk/src/node/context.hpp

    r2407 r2458  
    103103          
    104104         void initServer(MPI_Comm intraComm, int serviceType ); 
    105          void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer)  ; 
    106   
    107          void createServerInterComm(void)  ; 
    108          void createServerInterComm_old(void)  ; 
    109          void createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers ) ; 
    110   
    111  
     105          
    112106         bool isInitialized(void); 
    113107 
     
    123117 
    124118         bool isFinalized(void); 
    125  
    126119         void closeDefinition(void); 
    127120 
     
    159152 
    160153         // Distribute files (in write mode) among secondary-server pools according to the estimated data flux 
    161          void distributeFiles(const std::vector<CFile*>& files); 
    162          void distributeFileOverOne(const vector<CFile*>& files) ; //!< Distribute files over one single server (no distribution) 
    163          void distributeFileOverBandwith(const std::vector<CFile*>& files) ; //!< Distribute files overs servers to balance the I/O bandwith 
    164          void distributeFileOverMemoryBandwith(const std::vector<CFile*>& files) ; //!< Distribute files overs servers to minimize the memory consumption 
     154         void distributeFiles(const vector<CFile*>& files) ; 
     155         void distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) ; 
     156         void distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files over one single server (no distribution) 
     157         void distributeFileOverBandwith(const std::vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files overs servers to balance the I/O bandwith 
     158         void distributeFileOverMemoryBandwith(const std::vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files overs servers to minimize the memory consumption 
    165159          
    166160       public: 
     
    271265         bool setProcessingEvent(void) {isProcessingEvent_=true ;} 
    272266         bool unsetProcessingEvent(void) {isProcessingEvent_=false ;} 
    273          MPI_Comm getIntraComm(void) {return intraComm_ ;} 
    274          int getIntraCommRank(void) {return intraCommRank_;} 
    275          int getIntraCommSize(void) {return intraCommSize_;} 
    276  
     267          
    277268         void addCouplingChanel(const std::string& contextId, bool out) ; 
    278269 
     
    310301 
    311302      private: 
    312         std::string defaultReaderId ; 
    313         std::string defaultWriterId ; 
    314         std::string defaultGathererId ; 
     303        std::string defaultPoolWriterId_ ; 
     304        std::string defaultPoolReaderId_ ; 
     305        std::string defaultPoolGathererId_ ; 
     306        std::string defaultWriterId_ ; 
     307        std::string defaultReaderId_ ; 
     308        std::string defaultGathererId_ ; 
     309        bool defaultUsingServer2_ ; 
     310        void setDefaultServices(void) ; 
     311 
     312 
     313        std::map<std::pair<string,string>,std::vector<pair<CContextClient*,CContextServer*>>> serversMap_ ; 
    315314 
    316315        std::vector<CContextClient*> writerClientOut_ ; 
     
    342341        std::map<std::string, CContextServer*> couplerInServer_ ; 
    343342      public: 
     343         void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer)  ; 
     344         void createServerInterComm(void)  ; // obsolete 
     345         void createServerInterComm_old(void)  ; 
     346         void createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers ) ; 
     347         void getServerInterComm(const string& poolId, const string& serviceId,  vector<pair<CContextClient*,CContextServer*>>& clientServers) ; 
     348         vector<CContextClient*> getContextClient(const string& poolId, const string& serviceId) ; 
    344349         CContextClient* getCouplerInClient(const string& contextId) { return couplerInClient_[contextId] ;} 
    345350         CContextServer* getCouplerInServer(const string& contextId) { return couplerInServer_[contextId] ;} 
    346351         CContextClient* getCouplerOutClient(const string& contextId) { return couplerOutClient_[contextId] ;} 
    347352         CContextServer* getCouplerOutServer(const string& contextId) { return couplerOutServer_[contextId] ;} 
    348  
    349          CRegistry* registryIn=nullptr ;    //!< input registry which is read from file 
    350          CRegistry* registryOut=nullptr ;   //!< output registry which will be written into file at the finalize 
     353        
     354       public: // must be privatize using accessors 
     355         
     356        CRegistry* registryIn=nullptr ;    //!< input registry which is read from file 
     357        CRegistry* registryOut=nullptr ;   //!< output registry which will be written into file at the finalize 
    351358 
    352359 
     
    354361        int intraCommRank_ ; //! context intra communicator rank 
    355362        int intraCommSize_ ; //! context intra communicator size 
    356          
     363       public:  
     364        MPI_Comm getIntraComm(void) {return intraComm_ ;} 
     365        int getIntraCommRank(void) {return intraCommRank_;} 
     366        int getIntraCommSize(void) {return intraCommSize_;} 
    357367      private: 
    358368         shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context 
  • XIOS3/trunk/src/node/file.cpp

    r2409 r2458  
    2626      : CObjectTemplate<CFile>(), CFileAttributes() 
    2727      , vFieldGroup(), data_out(), enabledFields(), fileComm(MPI_COMM_NULL) 
    28       , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 
     28      , isOpen(false), checkRead(false), allZoneEmpty(false) 
    2929   { 
    3030     setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); 
     
    3535      : CObjectTemplate<CFile>(id), CFileAttributes() 
    3636      , vFieldGroup(), data_out(), enabledFields(), fileComm(MPI_COMM_NULL) 
    37       , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 
     37      , isOpen(false), checkRead(false), allZoneEmpty(false) 
    3838    { 
    3939      setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); 
     
    11171117   CATCH_DUMP_ATTR 
    11181118 
     1119   void CFile::getWriterServicesId(bool defaultUsingServer2_, const string& defaultPoolWriterId_, const string& defaultWriterId_, const string& defaultPoolGathererId_, const string& defaultGathererId_, 
     1120                                   bool& usingServer2, string& poolWriterId, string& writerId, string& poolGathererId, string& gathererId) 
     1121   { 
     1122     usingServer2 = defaultUsingServer2_ ; 
     1123     poolWriterId = defaultPoolWriterId_ ; 
     1124     writerId = defaultWriterId_ ; 
     1125     poolGathererId = defaultPoolGathererId_ ; 
     1126     gathererId = defaultGathererId_ ; 
     1127 
     1128     if (!using_server2.isEmpty()) usingServer2 = using_server2; 
     1129     if (!pool_writer.isEmpty()) poolWriterId = pool_writer ; 
     1130     if (!writer.isEmpty()) writerId = writer; 
     1131     if (!pool_gatherer.isEmpty()) poolGathererId = pool_gatherer; 
     1132     if (!gatherer.isEmpty()) gathererId = gatherer; 
     1133   } 
     1134 
     1135   void CFile::getReaderServicesId(const string& defaultPoolReaderId_, const string& defaultReaderId_, string& poolReaderId, string& readerId) 
     1136   { 
     1137     poolReaderId = defaultPoolReaderId_ ; 
     1138     readerId = defaultReaderId_ ; 
     1139     
     1140     if (!pool_reader.isEmpty()) poolReaderId = pool_reader ; 
     1141     if (!reader.isEmpty()) readerId = reader; 
     1142   } 
     1143 
     1144   void CFile::setContextClient(const string& defaultPoolId, const string& defaultServiceId, int partitionId) 
     1145   TRY 
     1146   { 
     1147     CContext* context = CContext::getCurrent(); 
     1148     vector<CContextClient*> clients = context->getContextClient(defaultPoolId, defaultServiceId) ; 
     1149     setContextClient(clients[partitionId]) ; 
     1150   } 
     1151   CATCH_DUMP_ATTR 
     1152 
     1153 
    11191154   void CFile::setContextClient(CContextClient* newContextClient) 
    11201155   TRY 
     
    11361171   CATCH_DUMP_ATTR 
    11371172 
    1138    void CFile::setReadContextClient(CContextClient* readContextclient) 
    1139    TRY 
    1140    { 
    1141      read_client = readContextclient; 
    1142    } 
    1143    CATCH_DUMP_ATTR 
    1144  
    1145    CContextClient* CFile::getReadContextClient() 
    1146    TRY 
    1147    { 
    1148      return read_client; 
    1149    } 
    1150    CATCH_DUMP_ATTR 
    1151  
     1173    
    11521174   /*! 
    11531175   \brief Send a message to create a field on server side 
  • XIOS3/trunk/src/node/file.hpp

    r2326 r2458  
    128128         CVariable* addVariable(const string& id = ""); 
    129129         CVariableGroup* addVariableGroup(const string& id = ""); 
     130          
     131          
     132         void getWriterServicesId(bool defaultUsingServer2_, const string& defaultPoolWriterId_, const string& defaultWriterId_, const string& defaultPoolGathererId_, const string& defaultGathererId_, 
     133                                  bool& usingServer2, string& poolWriterId, string& writerId, string& poolGathererId, string& gathererId) ; 
     134         void getReaderServicesId(const string& defaultPoolReaderId_, const string& defaultReaderId_, string& poolReaderId, string& readerId) ; 
     135  
     136 
     137         void setContextClient(const string& defaultPoolId, const string& defaultServiceId, int partitionId) ; 
    130138         void setContextClient(CContextClient* newContextClient); 
    131139         CContextClient* getContextClient(); 
    132  
    133          void setReadContextClient(CContextClient* newContextClient); 
    134          CContextClient* getReadContextClient(); 
    135140 
    136141         // Send info to server          
     
    192197         /// Propriétés privées /// 
    193198         CContextClient* client; 
    194          CContextClient* read_client; // Context client for reading (channel between server 1 and client) 
    195199         CFieldGroup* vFieldGroup; 
    196200         CVariableGroup* vVariableGroup; 
  • XIOS3/trunk/src/node/node_enum.hpp

    r2408 r2458  
    4444         eExtractDomain, 
    4545         ePoolNode,gPoolNode, 
     46         eServiceNode,gServiceNode, 
    4647//         eService, gService 
    4748       } ENodeType; 
  • XIOS3/trunk/src/node/node_type.hpp

    r2408 r2458  
    3434#include "extract_domain.hpp" 
    3535#include "pool_node.hpp" 
     36#include "service_node.hpp" 
    3637 
    3738 
  • XIOS3/trunk/src/node/pool_node.cpp

    r2408 r2458  
    11#include "pool_node.hpp" 
     2#include "cxios.hpp" 
     3#include<cmath> 
    24 
    35namespace xios 
     
    57   
    68  CPoolNode::CPoolNode(void) : CObjectTemplate<CPoolNode>(), CPoolNodeAttributes() 
    7   { /* Ne rien faire de plus */ } 
     9  {  
     10    setVirtualServiceNodeGroup(CServiceNodeGroup::create(getId() + "_virtual_service_node_group")); 
     11  } 
    812 
    913  CPoolNode::CPoolNode(const StdString & id) : CObjectTemplate<CPoolNode>(id), CPoolNodeAttributes() 
    10   { /* Ne rien faire de plus */ } 
     14  {  
     15    setVirtualServiceNodeGroup(CServiceNodeGroup::create(getId() + "_virtual_service_node_group")); 
     16  } 
    1117 
    1218  CPoolNode::~CPoolNode(void) 
     
    1723  { 
    1824    SuperClass::parse(node); 
     25    if (node.goToChildElement()) 
     26    { 
     27      do 
     28      { 
     29        if (node.getElementName()=="service" || node.getElementName()=="service_group") this->getVirtualServiceNodeGroup()->parseChild(node); 
     30      } while (node.goToNextElement()); 
     31      node.goToParentElement(); 
     32    } 
    1933  } 
    2034 
    21  
     35  void CPoolNode::allocateRessources(void) 
     36  { 
     37    int nonEmpty=0 ; 
     38    if (!nprocs.isEmpty()) nonEmpty++ ; 
     39    if (!global_fraction.isEmpty()) nonEmpty++ ; 
     40    if (!remain_fraction.isEmpty()) nonEmpty++ ; 
     41    if (!remain.isEmpty()) nonEmpty++ ; 
     42    if (nonEmpty==0) ERROR("void CPoolNode::allocateRessources(void)",<<"A number a ressource for allocate pool must be specified."  
     43                           <<"At least attributes, <nprocs> or <global_fraction> or <remain_fraction> or <remain> must be specified") 
     44    else if (nonEmpty>1) ERROR("void CPoolNode::allocateRessources(void)",<<"Only one of these attributes : <nprocs> or <global_fraction>" 
     45                               <<" or <remain_fraction> or <remain> must be specified to determine allocated ressouces." 
     46                               <<" More than one is currently specified") 
     47    auto ressourcesManager=CXios::getRessourcesManager() ; 
     48    int nbRessources ; 
     49    int globalRessources = ressourcesManager->getRessourcesSize() ; 
     50    int freeRessources =   ressourcesManager->getFreeRessourcesSize() ; 
     51    if (!nprocs.isEmpty()) nbRessources = nprocs ; 
     52    if (!global_fraction.isEmpty()) nbRessources = std::round(globalRessources * global_fraction) ; 
     53    if (!remain_fraction.isEmpty()) nbRessources = std::round(freeRessources * remain_fraction) ; 
     54    if (!remain.isEmpty()) nbRessources = freeRessources ; 
     55    if (nbRessources>freeRessources) 
     56      ERROR("void CPoolNode::allocateRessources(void)",<<"Cannot allocate required ressources for the pool." 
     57                                                       <<"  Required is : "<<nbRessources<<"  but free ressource is currently : "<<freeRessources) 
     58    string poolId ; 
     59    if (!name.isEmpty()) poolId=name ; 
     60    else if (!hasAutoGeneratedId() ) poolId=getId() ; 
     61    else ERROR("void CPoolNode::allocateRessources(void)",<<"Pool has no name or id, attributes <id> or <name> must be specified") 
     62    ressourcesManager->createPool(poolId, nbRessources) ; 
     63    ressourcesManager->waitPoolRegistration(poolId) ; 
     64    auto services=this->getAllServiceNodes() ; 
     65    for(auto& service : services) service->allocateRessources(poolId) ; 
     66  } 
    2267} 
    2368 
  • XIOS3/trunk/src/node/pool_node.hpp

    r2408 r2458  
    1010#include "group_factory.hpp" 
    1111#include "declare_group.hpp" 
     12#include "service_node.hpp" 
    1213 
    1314 
     
    5859    public: 
    5960      virtual void parse(xml::CXMLNode & node); 
     61      void allocateRessources(void) ; 
     62 
     63    private: 
     64      std::vector<CServiceNode*> getAllServiceNodes(void) const {return this->vServiceNodeGroup->getAllChildren();} 
     65      CServiceNodeGroup* getVirtualServiceNodeGroup(void) const {return this->vServiceNodeGroup; } 
     66      void setVirtualServiceNodeGroup(CServiceNodeGroup* newVServiceNodeGroup) { this->vServiceNodeGroup = newVServiceNodeGroup; } 
     67      CServiceNodeGroup* vServiceNodeGroup; 
    6068 
    6169  }; // class CPoolNode 
  • XIOS3/trunk/src/object_factory_decl2.cpp

    r2408 r2458  
    2222  macro(CExtractDomain) 
    2323  macro(CPoolNode) 
     24  macro(CServiceNode) 
    2425} 
    2526 
  • XIOS3/trunk/src/object_factory_decl3.cpp

    r2408 r2458  
    1919  macro(CZoomDomainGroup) 
    2020  macro(CPoolNodeGroup) 
     21  macro(CServiceNodeGroup) 
    2122} 
    2223 
  • XIOS3/trunk/src/object_template_decl.cpp

    r2408 r2458  
    3737  template class CObjectTemplate<CExtractDomain>; 
    3838  template class CObjectTemplate<CPoolNode>; 
     39  template class CObjectTemplate<CServiceNode>; 
    3940   
    4041  template class CObjectTemplate<CContextGroup>; 
     
    6970  template class CObjectTemplate<CExtractDomainGroup>; 
    7071  template class CObjectTemplate<CPoolNodeGroup>; 
     72  template class CObjectTemplate<CServiceNodeGroup>; 
    7173 
    7274} 
  • XIOS3/trunk/src/server.cpp

    r2437 r2458  
    2121#include "servers_ressource.hpp" 
    2222#include "services.hpp" 
     23#include "pool_node.hpp" 
    2324#include <cstdio> 
    2425#include "workflow_graph.hpp" 
     
    188189      if (serversRessource->isServerLeader()) 
    189190      { 
    190         int nbRessources = ressourcesManager->getRessourcesSize() ; 
    191         if (!CXios::usingServer2) 
     191        // creating pool 
     192        CPoolNodeGroup::get("xios","pool_definition")->solveDescInheritance(true) ; 
     193        vector<CPoolNode*> pools = CPoolNodeGroup::get("xios","pool_definition")->getAllChildren(); 
     194        for(auto& pool : pools) pool->allocateRessources() ; 
     195         
     196        int nbRessources = ressourcesManager->getFreeRessourcesSize() ; 
     197        if (nbRessources>0) 
    192198        { 
    193           ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    194           servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 
    195           servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 
    196         } 
    197         else 
    198         { 
    199           int nprocsServer = nbRessources*CXios::ratioServer2/100.; 
    200           int nprocsGatherer = nbRessources - nprocsServer ; 
     199          if (!CXios::usingServer2) 
     200          { 
     201            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
     202            ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     203            servicesManager->createServices(CXios::defaultPoolId, CXios::defaultWriterId, CServicesManager::WRITER,nbRessources,1) ; 
     204            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultWriterId) ; 
     205          } 
     206          else 
     207          { 
     208            int nprocsServer = nbRessources*CXios::ratioServer2/100.; 
     209            int nprocsGatherer = nbRessources - nprocsServer ; 
    201210           
    202           int nbPoolsServer2 = CXios::nbPoolsServer2 ; 
    203           if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 
    204           ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
    205           servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
    206           servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 
    207           servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 
    208  
    209  
     211            int nbPoolsServer2 = CXios::nbPoolsServer2 ; 
     212            if (nbPoolsServer2 == 0) nbPoolsServer2 = nprocsServer; 
     213            ressourcesManager->createPool(CXios::defaultPoolId, nbRessources) ; 
     214            ressourcesManager->waitPoolRegistration(CXios::defaultPoolId) ; 
     215            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultGathererId, CServicesManager::GATHERER, nprocsGatherer, 1) ; 
     216            servicesManager->createServicesOnto(CXios::defaultPoolId, CXios::defaultReaderId, CServicesManager::READER, CXios::defaultGathererId) ; 
     217            servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultWriterId, CServicesManager::WRITER, nprocsServer, nbPoolsServer2) ; 
     218          } 
    210219        } 
    211220//        servicesManager->createServices(CXios::defaultPoolId,  CXios::defaultServicesId, CServicesManager::ALL_SERVICES, nbRessources, 1) ; 
    212221      } 
     222/* 
     223      MPI_Request req ; 
     224      MPI_Status status ; 
     225      MPI_Ibarrier(xiosGlobalComm,&req) ; // be sure that all services are created now, could be remove later if more asynchronisity 
     226      int ok=false ; 
     227      while (!ok) 
     228      { 
     229        daemonsManager->eventLoop() ; 
     230        MPI_Test(&req,&ok,&status) ; 
     231      } 
     232*/ 
    213233      CTimer::get("XIOS initialize").suspend() ; 
    214234 
  • XIOS3/trunk/src/transport/legacy_context_client.cpp

    r2343 r2458  
    1313#include "server.hpp" 
    1414#include "services.hpp" 
     15#include "ressources_manager.hpp" 
    1516#include <boost/functional/hash.hpp> 
    1617#include <random> 
     
    110111      } 
    111112       
     113      MPI_Request req ; 
     114      MPI_Status status ; 
     115      MPI_Ibarrier(intraComm,&req) ; 
     116      int flag ; 
     117      MPI_Test(&req,&flag,&status) ; 
     118      while(!flag)  
     119      { 
     120        callGlobalEventLoop() ; 
     121        MPI_Test(&req,&flag,&status) ; 
     122      } 
     123 
     124 
    112125      timeLine++; 
    113126    } 
     
    126139     * \return whether the already allocated buffers could be used 
    127140    */ 
    128     bool CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, 
    129                                     bool nonBlocking /*= false*/) 
     141    void CLegacyContextClient::getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers) 
    130142    { 
    131143      list<int>::const_iterator itServer, itSize; 
     
    134146      list<CClientBuffer*>::iterator itBuffer; 
    135147      bool areBuffersFree; 
    136  
     148      
    137149      for (itServer = serverList.begin(); itServer != serverList.end(); itServer++) 
    138150      { 
     
    140152        if (it == buffers.end()) 
    141153        { 
     154          CTokenManager* tokenManager = CXios::getRessourcesManager()->getTokenManager() ; 
     155          size_t token = tokenManager->getToken() ; 
     156          while (!tokenManager->lockToken(token)) callGlobalEventLoop() ; 
    142157          newBuffer(*itServer); 
    143158          it = buffers.find(*itServer); 
     159          checkAttachWindows(it->second,it->first) ; 
     160          tokenManager->unlockToken(token) ; 
    144161        } 
    145162        bufferList.push_back(it->second); 
     
    177194        } 
    178195 
    179       } while (!areBuffersFree && !nonBlocking); 
     196      } while (!areBuffersFree); 
    180197      CTimer::get("Blocking time").suspend(); 
    181198 
    182       if (areBuffersFree) 
    183       { 
    184         for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
    185           retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
    186       } 
    187       return areBuffersFree; 
     199      for (itBuffer = bufferList.begin(), itSize = sizeList.begin(); itBuffer != bufferList.end(); itBuffer++, itSize++) 
     200        retBuffers.push_back((*itBuffer)->getBuffer(timeLine, *itSize)); 
    188201   } 
    189202 
     
    225238      bufOut->put(sendBuff, 4);  
    226239      buffer->checkBuffer(true); 
    227        
     240/* 
    228241       // create windows dynamically for one-sided 
    229242      if (!isAttachedModeEnabled()) 
     
    254267      buffer->attachWindows(windows_[rank]) ; 
    255268      if (!isAttachedModeEnabled()) MPI_Barrier(winComm_[rank]) ; 
    256         
    257    } 
    258  
     269  */      
     270   } 
     271 
     272   void CLegacyContextClient::checkAttachWindows(CClientBuffer* buffer, int rank) 
     273   { 
     274      if (!buffer->isAttachedWindows()) 
     275      { 
     276           // create windows dynamically for one-sided 
     277        if (!isAttachedModeEnabled()) 
     278        {  
     279          CTimer::get("create Windows").resume() ; 
     280          MPI_Comm interComm ; 
     281          MPI_Intercomm_create(commSelf_, 0, interCommMerged_, clientSize+rank, 0, &interComm) ; 
     282          MPI_Intercomm_merge(interComm, false, &winComm_[rank]) ; 
     283          CXios::getMpiGarbageCollector().registerCommunicator(winComm_[rank]) ; 
     284          MPI_Comm_free(&interComm) ; 
     285          windows_[rank].resize(2) ; 
     286       
     287          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][0]); 
     288          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][0]) ; 
     289       
     290          MPI_Win_create_dynamic(MPI_INFO_NULL, winComm_[rank], &windows_[rank][1]);    
     291          CXios::getMpiGarbageCollector().registerWindow(windows_[rank][1]) ; 
     292 
     293          CTimer::get("create Windows").suspend() ; 
     294          buffer->attachWindows(windows_[rank]) ; 
     295          MPI_Barrier(winComm_[rank]) ; 
     296        } 
     297        else 
     298        { 
     299          winComm_[rank] = MPI_COMM_NULL ; 
     300          windows_[rank].resize(2) ; 
     301          windows_[rank][0] = MPI_WIN_NULL ; 
     302          windows_[rank][1] = MPI_WIN_NULL ; 
     303          buffer->attachWindows(windows_[rank]) ; 
     304        } 
     305 
     306      } 
     307    } 
     308 
     309 
     310   
    259311   /*! 
    260312   Verify state of buffers. Buffer is under pending state if there is no message on it 
  • XIOS3/trunk/src/transport/legacy_context_client.hpp

    r2343 r2458  
    4545 
    4646      // Functions to set/get buffers 
    47       bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
     47      void getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers); 
    4848      void newBuffer(int rank); 
     49      void checkAttachWindows(CClientBuffer* buffer , int rank) ; 
    4950      bool checkBuffers(list<int>& ranks); 
    5051      bool checkBuffers(void); 
  • XIOS3/trunk/src/transport/legacy_context_server.cpp

    r2433 r2458  
    9595 
    9696    traceOff(); 
    97     MPI_Improbe(MPI_ANY_SOURCE, 20,interComm,&flag,&message, &status); 
     97    MPI_Improbe(MPI_ANY_SOURCE, 20, interComm,&flag,&message, &status); 
    9898    traceOn(); 
    9999    if (flag==true) listenPendingRequest(message, status) ; 
  • XIOS3/trunk/src/transport/one_sided_context_client.hpp

    r2343 r2458  
    4141 
    4242      // Functions to set/get buffers 
    43       bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
     43//      bool getBuffers(const size_t timeLine, const list<int>& serverList, const list<int>& sizeList, list<CBufferOut*>& retBuffers, bool nonBlocking = false); 
    4444      void newBuffer(int rank); 
    4545      bool checkBuffers(list<int>& ranks); 
  • XIOS3/trunk/src/type/type_util.hpp

    r2408 r2458  
    6868    class CPoolNode ; 
    6969    class CPoolNodeGroup ; 
    70       
     70    class CServiceNode ; 
     71    class CServiceNodeGroup ; 
     72       
    7173  template <typename T> inline string getStrType(void); 
    7274 
     
    154156  macro(CPoolNode) 
    155157  macro(CPoolNodeGroup) 
     158  macro(CServiceNode) 
     159  macro(CServiceNodeGroup) 
    156160   
    157161#undef macro 
  • XIOS3/trunk/src/xml_parser_decl.cpp

    r2408 r2458  
    4343    macro( ExtractDomain ) 
    4444    macro( PoolNode ) 
     45    macro( ServiceNode ) 
    4546  } 
    4647} 
Note: See TracChangeset for help on using the changeset viewer.