Ignore:
Timestamp:
01/03/23 19:06:42 (19 months ago)
Author:
ymipsl
Message:

Implementation of files service on dev branch

YM

Location:
XIOS3/dev/XIOS_FILE_SERVICES/src/node
Files:
2 added
8 edited

Legend:

Unmodified
Added
Removed
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.cpp

    r2441 r2453  
    577577    if (commRank==0) 
    578578    { 
    579       CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions) ; 
     579      CXios::getServicesManager()->getServiceNbPartitions(poolId, serverId, 0, nbPartitions, true) ; 
    580580      for(int i=0 ; i<nbPartitions; i++) CXios::getContextsManager()->createServerContext(poolId, serverId, i, getContextId()) ; 
    581581    } 
     
    588588      parentServerContext_->createIntercomm(poolId, serverId, i, getContextId(), intraComm_, interCommClient, interCommServer) ; 
    589589      int type ;  
    590       if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type) ; 
     590      if (commRank==0) CXios::getServicesManager()->getServiceType(poolId, serverId, 0, type, true) ; 
    591591      MPI_Bcast(&type,1,MPI_INT,0,intraComm_) ; 
    592592      string fullServerId=CXios::getContextsManager()->getServerContextName(poolId, serverId, i, type, getContextId()) ; 
     
    610610  CATCH_DUMP_ATTR 
    611611   
     612   
     613  // obsolete 
    612614  void CContext::createServerInterComm(void)  
    613615  TRY 
     
    645647  } 
    646648  CATCH_DUMP_ATTR 
     649 
     650   
     651  void CContext::getServerInterComm(const string& poolId, const string& serviceId,  vector<pair<CContextClient*,CContextServer*>>& clientServers) 
     652  { 
     653    vector<pair<string, pair<CContextClient*,CContextServer*>>> retClientServers ; 
     654 
     655    auto it=serversMap_.find(make_pair(poolId,serviceId)) ; 
     656    if (it!=serversMap_.end()) clientServers=it->second ; 
     657    else 
     658    { 
     659      if (attached_mode) createServerInterComm(CClient::getPoolRessource()->getId(), getContextId()+"_"+serviceId, retClientServers) ; 
     660      else createServerInterComm(poolId, serviceId, retClientServers) ; 
     661      for(auto& retClientServer : retClientServers)  clientServers.push_back(retClientServer.second) ; 
     662       
     663      int serviceType ; 
     664      if (intraCommRank_==0) CXios::getServicesManager()->getServiceType(poolId, serviceId, 0, serviceType) ; 
     665      MPI_Bcast(&serviceType,1,MPI_INT,0,intraComm_) ; 
     666       
     667      for(auto& clientServer : clientServers)      
     668      { 
     669        if (serviceType==CServicesManager::WRITER) { writerClientOut_.push_back(clientServer.first)      ; writerServerOut_.push_back(clientServer.second) ; } 
     670        else if (serviceType==CServicesManager::READER) { readerClientOut_.push_back(clientServer.first) ; readerServerOut_.push_back(clientServer.second) ; } 
     671        else if (serviceType==CServicesManager::GATHERER) { writerClientOut_.push_back(clientServer.first) ; writerServerOut_.push_back(clientServer.second) ; } 
     672      } 
     673      serversMap_.insert(make_pair(make_pair(poolId,serviceId),clientServers)) ; 
     674    } 
     675 
     676  } 
     677 
     678  vector<CContextClient*> CContext::getContextClient(const string& poolId, const string& serviceId) 
     679  { 
     680    vector<pair<CContextClient*,CContextServer*>> clientServers ; 
     681    getServerInterComm(poolId, serviceId, clientServers ) ; 
     682    vector<CContextClient*> ret ; 
     683    for(auto& clientServer : clientServers) ret.push_back(clientServer.first) ; 
     684    return ret ; 
     685  } 
     686 
    647687 
    648688  void CContext::globalEventLoop(void) 
     
    783823        CContextServer* server ; 
    784824 
     825        /* 
    785826        if (writerClientOut_.size()!=0) 
    786827        { 
     
    803844          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
    804845        } 
     846        */ 
     847 
     848        for(int n=0; n<writerClientOut_.size() ; n++) 
     849        { 
     850          client=writerClientOut_[n] ; 
     851          server=writerServerOut_[n] ; 
     852 
     853          info(100)<<"DEBUG: context "<<getId()<<" Send client finalize to writer"<<endl ; 
     854          client->finalize(); 
     855          info(100)<<"DEBUG: context "<<getId()<<" Client finalize sent to writer"<<endl ; 
     856          bool bufferReleased; 
     857          do 
     858          { 
     859            client->eventLoop(); 
     860            bufferReleased = !client->havePendingRequests(); 
     861          } while (!bufferReleased); 
     862          info(100)<<"DEBUG: context "<<getId()<<" no pending request on writer ok"<<endl ; 
     863 
     864          bool notifiedFinalized=false ; 
     865          do 
     866          { 
     867            notifiedFinalized=client->isNotifiedFinalized() ; 
     868          } while (!notifiedFinalized) ; 
     869          server->releaseBuffers(); 
     870          client->releaseBuffers(); 
     871          info(100)<<"DEBUG: context "<<getId()<<" release client writer ok"<<endl ; 
     872        } 
     873         
    805874 
    806875        if (readerClientOut_.size()!=0) 
     
    879948 
    880949    
     950   void CContext::setDefaultServices(void) 
     951   { 
     952     defaultPoolWriterId_ = CXios::defaultPoolId ; 
     953     defaultPoolReaderId_ = CXios::defaultPoolId ; 
     954     defaultPoolGathererId_ = CXios::defaultPoolId ; 
     955     defaultWriterId_ = CXios::defaultWriterId ; 
     956     defaultReaderId_ = CXios::defaultReaderId ; 
     957     defaultGathererId_ = CXios::defaultGathererId ; 
     958     defaultUsingServer2_ = CXios::usingServer2 ; 
     959      
     960     if (!default_pool.isEmpty())  defaultPoolWriterId_ = defaultPoolReaderId_= defaultPoolGathererId_= default_pool ; 
     961     if (!default_pool_writer.isEmpty()) defaultPoolWriterId_ = default_pool_writer ; 
     962     if (!default_pool_reader.isEmpty()) defaultPoolReaderId_ = default_pool_reader ; 
     963     if (!default_pool_gatherer.isEmpty()) defaultPoolGathererId_ = default_pool_gatherer ; 
     964     if (!default_writer.isEmpty()) defaultWriterId_ = default_writer ; 
     965     if (!default_reader.isEmpty()) defaultWriterId_ = default_reader ; 
     966     if (!default_gatherer.isEmpty()) defaultGathererId_ = default_gatherer ; 
     967     if (!default_using_server2.isEmpty()) defaultUsingServer2_ = default_using_server2 ; 
     968   } 
     969 
    881970   /*! 
    882971   \brief Close all the context defintion and do processing data 
     
    894983 
    895984     CTimer::get("Context : close definition").resume() ; 
    896       
     985           
    897986     // create intercommunicator with servers.  
    898987     // not sure it is the good place to be called here  
    899      createServerInterComm() ; 
     988     //createServerInterComm() ; 
    900989 
    901990 
     
    9161005    // pour chacun des contextes. 
    9171006    solveDescInheritance(true); 
    918   
     1007    setDefaultServices() ; 
    9191008    // Check if some axis, domains or grids are eligible to for compressed indexed output. 
    9201009    // Warning: This must be done after solving the inheritance and before the rest of post-processing 
     
    10231112 
    10241113    // Distribute files between secondary servers according to the data size => assign a context to a file and then to fields 
    1025     if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles); 
    1026     //else if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledWriteModeFiles) file->setContextClient(client) ; 
     1114     
     1115    if (serviceType_==CServicesManager::CLIENT || serviceType_==CServicesManager::GATHERER) distributeFiles(this->enabledWriteModeFiles) ; 
     1116    /* 
     1117    if (serviceType_==CServicesManager::CLIENT ) 
     1118    {    
     1119      if (CXios::usingServer2) distributeFiles(this->enabledWriteModeFiles, defaultPoolGathererId_, defaultGathererId_); 
     1120      else distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1121    } 
     1122    if (serviceType_==CServicesManager::GATHERER ) distributeFiles(this->enabledWriteModeFiles, defaultPoolWriterId_, defaultWriterId_); 
     1123    */ 
    10271124 
    10281125    // client side, assign context for file reading 
    1029     if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
    1030      
     1126//    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientOut_[0]) ; 
     1127    if (serviceType_==CServicesManager::CLIENT) for(auto file : this->enabledReadModeFiles)  
     1128    { 
     1129      string poolReaderId ; 
     1130      string readerId ; 
     1131      file->getReaderServicesId(defaultPoolReaderId_, defaultReaderId_, poolReaderId, readerId) ; 
     1132      file->setContextClient(poolReaderId, readerId, 0) ; 
     1133    } 
     1134 
    10311135    // server side, assign context where to send file data read 
    10321136    if (serviceType_==CServicesManager::READER) for(auto file : this->enabledReadModeFiles) file->setContextClient(readerClientIn_[0]) ; 
     
    11631267    // fix size for each context client 
    11641268    for(auto& it : fieldBufferEvaluation) it.first->setBufferSize(it.second) ; 
     1269 
    11651270 
    11661271     CTimer::get("Context : close definition").suspend() ; 
     
    14361541   TRY 
    14371542   { 
     1543     map< pair<string,string>, vector<CFile*>> fileMaps ; 
     1544     for(auto& file : files) 
     1545     { 
     1546       string poolWriterId ; 
     1547       string poolGathererId ; 
     1548       string writerId  ; 
     1549       string gathererId  ; 
     1550       bool usingServer2 ; 
     1551 
     1552       file->getWriterServicesId(defaultUsingServer2_, defaultPoolWriterId_, defaultWriterId_, defaultPoolGathererId_, defaultGathererId_, 
     1553                                 usingServer2, poolWriterId, writerId, poolGathererId, gathererId) ; 
     1554       if (serviceType_==CServicesManager::CLIENT && usingServer2) fileMaps[make_pair(poolGathererId,gathererId)].push_back(file) ; 
     1555       else fileMaps[make_pair(poolWriterId,writerId)].push_back(file) ; 
     1556     } 
     1557     for(auto& it : fileMaps) distributeFilesOnSameService(it.second, it.first.first, it.first.second) ; 
     1558   } 
     1559   CATCH_DUMP_ATTR 
     1560 
     1561 
     1562   void CContext::distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1563   TRY 
     1564   { 
    14381565     bool distFileMemory=false ; 
    14391566     distFileMemory=CXios::getin<bool>("server2_dist_file_memory", distFileMemory); 
    14401567 
    1441      int nbPools = writerClientOut_.size(); 
    1442      if (nbPools==1) distributeFileOverOne(files) ; 
    1443      else if (distFileMemory) distributeFileOverMemoryBandwith(files) ; 
    1444      else distributeFileOverBandwith(files) ; 
    1445    } 
    1446    CATCH_DUMP_ATTR 
    1447  
    1448    void CContext::distributeFileOverOne(const vector<CFile*>& files) 
    1449    TRY 
    1450    { 
    1451     for(auto& file : files) file->setContextClient(writerClientOut_[0]) ; 
    1452    } 
    1453    CATCH_DUMP_ATTR 
    1454  
    1455    void CContext::distributeFileOverBandwith(const vector<CFile*>& files) 
     1568     auto writers = getContextClient(poolId, serviceId) ; 
     1569     int  nbPools = writers.size() ; 
     1570      
     1571     if (nbPools==1) distributeFileOverOne(files, poolId, serviceId) ; 
     1572     else if (distFileMemory) distributeFileOverMemoryBandwith(files, poolId, serviceId) ; 
     1573     else distributeFileOverBandwith(files, poolId, serviceId) ; 
     1574   } 
     1575   CATCH_DUMP_ATTR 
     1576 
     1577   void CContext::distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
     1578   TRY 
     1579   { 
     1580     for(auto& file : files) file->setContextClient(poolId, serviceId,0) ; 
     1581   } 
     1582   CATCH_DUMP_ATTR 
     1583 
     1584   void CContext::distributeFileOverBandwith(const vector<CFile*>& files, const string& poolId, const string& serviceId) 
    14561585   TRY 
    14571586   { 
     
    14591588      
    14601589     std::ofstream ofs(("distribute_file_"+getId()+".dat").c_str(), std::ofstream::out); 
    1461      int nbPools = writerClientOut_.size(); 
     1590     auto writers = getContextClient(poolId, serviceId) ; 
     1591     int nbPools = writers.size(); 
     1592     //int nbPools = writerClientOut_.size(); 
    14621593 
    14631594     // (1) Find all enabled files in write mode 
     
    15121643       dataSize=(*poolDataSize.begin()).first ; 
    15131644       j=(*poolDataSize.begin()).second ; 
    1514        dataSizeMap[i].second->setContextClient(writerClientOut_[j]); 
     1645       dataSizeMap[i].second->setContextClient(poolId, serviceId, j); 
    15151646       dataSize+=dataSizeMap[i].first; 
    15161647       poolDataSize.erase(poolDataSize.begin()) ; 
     
    15221653   CATCH_DUMP_ATTR 
    15231654 
    1524    void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList) 
    1525    TRY 
    1526    { 
    1527      int nbPools = writerClientOut_.size(); 
     1655   void CContext::distributeFileOverMemoryBandwith(const vector<CFile*>& filesList, const string& poolId, const string& serviceId) 
     1656   TRY 
     1657   { 
     1658     auto writers = getContextClient(poolId, serviceId) ; 
     1659     int nbPools = writers.size(); 
     1660     
    15281661     double ratio=0.5 ; 
    15291662     ratio=CXios::getin<double>("server2_dist_file_memory_ratio", ratio); 
     
    15911724         } 
    15921725       } 
    1593        filesList[i]->setContextClient(writerClientOut_[files[i].assignedServer_]) ; 
     1726       filesList[i]->setContextClient(poolId, serviceId, files[i].assignedServer_) ; 
    15941727       delete [] files[i].assignedGrid_ ; 
    15951728     } 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/context.hpp

    r2407 r2453  
    103103          
    104104         void initServer(MPI_Comm intraComm, int serviceType ); 
    105          void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer)  ; 
    106   
    107          void createServerInterComm(void)  ; 
    108          void createServerInterComm_old(void)  ; 
    109          void createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers ) ; 
    110   
    111  
     105          
    112106         bool isInitialized(void); 
    113107 
     
    123117 
    124118         bool isFinalized(void); 
    125  
    126119         void closeDefinition(void); 
    127120 
     
    159152 
    160153         // Distribute files (in write mode) among secondary-server pools according to the estimated data flux 
    161          void distributeFiles(const std::vector<CFile*>& files); 
    162          void distributeFileOverOne(const vector<CFile*>& files) ; //!< Distribute files over one single server (no distribution) 
    163          void distributeFileOverBandwith(const std::vector<CFile*>& files) ; //!< Distribute files overs servers to balance the I/O bandwith 
    164          void distributeFileOverMemoryBandwith(const std::vector<CFile*>& files) ; //!< Distribute files overs servers to minimize the memory consumption 
     154         void distributeFiles(const vector<CFile*>& files) ; 
     155         void distributeFilesOnSameService(const vector<CFile*>& files, const string& poolId, const string& serviceId) ; 
     156         void distributeFileOverOne(const vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files over one single server (no distribution) 
     157         void distributeFileOverBandwith(const std::vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files overs servers to balance the I/O bandwith 
     158         void distributeFileOverMemoryBandwith(const std::vector<CFile*>& files, const string& poolId, const string& serviceId) ; //!< Distribute files overs servers to minimize the memory consumption 
    165159          
    166160       public: 
     
    271265         bool setProcessingEvent(void) {isProcessingEvent_=true ;} 
    272266         bool unsetProcessingEvent(void) {isProcessingEvent_=false ;} 
    273          MPI_Comm getIntraComm(void) {return intraComm_ ;} 
    274          int getIntraCommRank(void) {return intraCommRank_;} 
    275          int getIntraCommSize(void) {return intraCommSize_;} 
    276  
     267          
    277268         void addCouplingChanel(const std::string& contextId, bool out) ; 
    278269 
     
    310301 
    311302      private: 
    312         std::string defaultReaderId ; 
    313         std::string defaultWriterId ; 
    314         std::string defaultGathererId ; 
     303        std::string defaultPoolWriterId_ ; 
     304        std::string defaultPoolReaderId_ ; 
     305        std::string defaultPoolGathererId_ ; 
     306        std::string defaultWriterId_ ; 
     307        std::string defaultReaderId_ ; 
     308        std::string defaultGathererId_ ; 
     309        bool defaultUsingServer2_ ; 
     310        void setDefaultServices(void) ; 
     311 
     312 
     313        std::map<std::pair<string,string>,std::vector<pair<CContextClient*,CContextServer*>>> serversMap_ ; 
    315314 
    316315        std::vector<CContextClient*> writerClientOut_ ; 
     
    342341        std::map<std::string, CContextServer*> couplerInServer_ ; 
    343342      public: 
     343         void createClientInterComm(MPI_Comm interCommClient, MPI_Comm interCommServer)  ; 
     344         void createServerInterComm(void)  ; // obsolete 
     345         void createServerInterComm_old(void)  ; 
     346         void createServerInterComm(const string& poolId, const string& serverId, vector<pair<string, pair<CContextClient*,CContextServer*>>>& clientServers ) ; 
     347         void getServerInterComm(const string& poolId, const string& serviceId,  vector<pair<CContextClient*,CContextServer*>>& clientServers) ; 
     348         vector<CContextClient*> getContextClient(const string& poolId, const string& serviceId) ; 
    344349         CContextClient* getCouplerInClient(const string& contextId) { return couplerInClient_[contextId] ;} 
    345350         CContextServer* getCouplerInServer(const string& contextId) { return couplerInServer_[contextId] ;} 
    346351         CContextClient* getCouplerOutClient(const string& contextId) { return couplerOutClient_[contextId] ;} 
    347352         CContextServer* getCouplerOutServer(const string& contextId) { return couplerOutServer_[contextId] ;} 
    348  
    349          CRegistry* registryIn=nullptr ;    //!< input registry which is read from file 
    350          CRegistry* registryOut=nullptr ;   //!< output registry which will be written into file at the finalize 
     353        
     354       public: // must be privatize using accessors 
     355         
     356        CRegistry* registryIn=nullptr ;    //!< input registry which is read from file 
     357        CRegistry* registryOut=nullptr ;   //!< output registry which will be written into file at the finalize 
    351358 
    352359 
     
    354361        int intraCommRank_ ; //! context intra communicator rank 
    355362        int intraCommSize_ ; //! context intra communicator size 
    356          
     363       public:  
     364        MPI_Comm getIntraComm(void) {return intraComm_ ;} 
     365        int getIntraCommRank(void) {return intraCommRank_;} 
     366        int getIntraCommSize(void) {return intraCommSize_;} 
    357367      private: 
    358368         shared_ptr<CEventScheduler> eventScheduler_ ; //! The local event scheduler for context 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/file.cpp

    r2409 r2453  
    2626      : CObjectTemplate<CFile>(), CFileAttributes() 
    2727      , vFieldGroup(), data_out(), enabledFields(), fileComm(MPI_COMM_NULL) 
    28       , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 
     28      , isOpen(false), checkRead(false), allZoneEmpty(false) 
    2929   { 
    3030     setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); 
     
    3535      : CObjectTemplate<CFile>(id), CFileAttributes() 
    3636      , vFieldGroup(), data_out(), enabledFields(), fileComm(MPI_COMM_NULL) 
    37       , isOpen(false), read_client(0), checkRead(false), allZoneEmpty(false) 
     37      , isOpen(false), checkRead(false), allZoneEmpty(false) 
    3838    { 
    3939      setVirtualFieldGroup(CFieldGroup::create(getId() + "_virtual_field_group")); 
     
    11171117   CATCH_DUMP_ATTR 
    11181118 
     1119   void CFile::getWriterServicesId(bool defaultUsingServer2_, const string& defaultPoolWriterId_, const string& defaultWriterId_, const string& defaultPoolGathererId_, const string& defaultGathererId_, 
     1120                                   bool& usingServer2, string& poolWriterId, string& writerId, string& poolGathererId, string& gathererId) 
     1121   { 
     1122     usingServer2 = defaultUsingServer2_ ; 
     1123     poolWriterId = defaultPoolWriterId_ ; 
     1124     writerId = defaultWriterId_ ; 
     1125     poolGathererId = defaultPoolGathererId_ ; 
     1126     gathererId = defaultGathererId_ ; 
     1127 
     1128     if (!using_server2.isEmpty()) usingServer2 = using_server2; 
     1129     if (!pool_writer.isEmpty()) poolWriterId = pool_writer ; 
     1130     if (!writer.isEmpty()) writerId = writer; 
     1131     if (!pool_gatherer.isEmpty()) poolGathererId = pool_gatherer; 
     1132     if (!gatherer.isEmpty()) gathererId = gatherer; 
     1133   } 
     1134 
     1135   void CFile::getReaderServicesId(const string& defaultPoolReaderId_, const string& defaultReaderId_, string& poolReaderId, string& readerId) 
     1136   { 
     1137     poolReaderId = defaultPoolReaderId_ ; 
     1138     readerId = defaultReaderId_ ; 
     1139     
     1140     if (!pool_reader.isEmpty()) poolReaderId = pool_reader ; 
     1141     if (!reader.isEmpty()) readerId = reader; 
     1142   } 
     1143 
     1144   void CFile::setContextClient(const string& defaultPoolId, const string& defaultServiceId, int partitionId) 
     1145   TRY 
     1146   { 
     1147     CContext* context = CContext::getCurrent(); 
     1148     vector<CContextClient*> clients = context->getContextClient(defaultPoolId, defaultServiceId) ; 
     1149     setContextClient(clients[partitionId]) ; 
     1150   } 
     1151   CATCH_DUMP_ATTR 
     1152 
     1153 
    11191154   void CFile::setContextClient(CContextClient* newContextClient) 
    11201155   TRY 
     
    11361171   CATCH_DUMP_ATTR 
    11371172 
    1138    void CFile::setReadContextClient(CContextClient* readContextclient) 
    1139    TRY 
    1140    { 
    1141      read_client = readContextclient; 
    1142    } 
    1143    CATCH_DUMP_ATTR 
    1144  
    1145    CContextClient* CFile::getReadContextClient() 
    1146    TRY 
    1147    { 
    1148      return read_client; 
    1149    } 
    1150    CATCH_DUMP_ATTR 
    1151  
     1173    
    11521174   /*! 
    11531175   \brief Send a message to create a field on server side 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/file.hpp

    r2326 r2453  
    128128         CVariable* addVariable(const string& id = ""); 
    129129         CVariableGroup* addVariableGroup(const string& id = ""); 
     130          
     131          
     132         void getWriterServicesId(bool defaultUsingServer2_, const string& defaultPoolWriterId_, const string& defaultWriterId_, const string& defaultPoolGathererId_, const string& defaultGathererId_, 
     133                                  bool& usingServer2, string& poolWriterId, string& writerId, string& poolGathererId, string& gathererId) ; 
     134         void getReaderServicesId(const string& defaultPoolReaderId_, const string& defaultReaderId_, string& poolReaderId, string& readerId) ; 
     135  
     136 
     137         void setContextClient(const string& defaultPoolId, const string& defaultServiceId, int partitionId) ; 
    130138         void setContextClient(CContextClient* newContextClient); 
    131139         CContextClient* getContextClient(); 
    132  
    133          void setReadContextClient(CContextClient* newContextClient); 
    134          CContextClient* getReadContextClient(); 
    135140 
    136141         // Send info to server          
     
    192197         /// Propriétés privées /// 
    193198         CContextClient* client; 
    194          CContextClient* read_client; // Context client for reading (channel between server 1 and client) 
    195199         CFieldGroup* vFieldGroup; 
    196200         CVariableGroup* vVariableGroup; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/node_enum.hpp

    r2408 r2453  
    4444         eExtractDomain, 
    4545         ePoolNode,gPoolNode, 
     46         eServiceNode,gServiceNode, 
    4647//         eService, gService 
    4748       } ENodeType; 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/node_type.hpp

    r2408 r2453  
    3434#include "extract_domain.hpp" 
    3535#include "pool_node.hpp" 
     36#include "service_node.hpp" 
    3637 
    3738 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/pool_node.cpp

    r2408 r2453  
    11#include "pool_node.hpp" 
     2#include "cxios.hpp" 
     3#include<cmath> 
    24 
    35namespace xios 
     
    57   
    68  CPoolNode::CPoolNode(void) : CObjectTemplate<CPoolNode>(), CPoolNodeAttributes() 
    7   { /* Ne rien faire de plus */ } 
     9  {  
     10    setVirtualServiceNodeGroup(CServiceNodeGroup::create(getId() + "_virtual_service_node_group")); 
     11  } 
    812 
    913  CPoolNode::CPoolNode(const StdString & id) : CObjectTemplate<CPoolNode>(id), CPoolNodeAttributes() 
    10   { /* Ne rien faire de plus */ } 
     14  {  
     15    setVirtualServiceNodeGroup(CServiceNodeGroup::create(getId() + "_virtual_service_node_group")); 
     16  } 
    1117 
    1218  CPoolNode::~CPoolNode(void) 
     
    1723  { 
    1824    SuperClass::parse(node); 
     25    if (node.goToChildElement()) 
     26    { 
     27      do 
     28      { 
     29        if (node.getElementName()=="service" || node.getElementName()=="service_group") this->getVirtualServiceNodeGroup()->parseChild(node); 
     30      } while (node.goToNextElement()); 
     31      node.goToParentElement(); 
     32    } 
    1933  } 
    2034 
    21  
     35  void CPoolNode::allocateRessources(void) 
     36  { 
     37    int nonEmpty=0 ; 
     38    if (!nprocs.isEmpty()) nonEmpty++ ; 
     39    if (!global_fraction.isEmpty()) nonEmpty++ ; 
     40    if (!remain_fraction.isEmpty()) nonEmpty++ ; 
     41    if (!remain.isEmpty()) nonEmpty++ ; 
     42    if (nonEmpty==0) ERROR("void CPoolNode::allocateRessources(void)",<<"A number a ressource for allocate pool must be specified."  
     43                           <<"At least attributes, <nprocs> or <global_fraction> or <remain_fraction> or <remain> must be specified") 
     44    else if (nonEmpty>1) ERROR("void CPoolNode::allocateRessources(void)",<<"Only one of these attributes : <nprocs> or <global_fraction>" 
     45                               <<" or <remain_fraction> or <remain> must be specified to determine allocated ressouces." 
     46                               <<" More than one is currently specified") 
     47    auto ressourcesManager=CXios::getRessourcesManager() ; 
     48    int nbRessources ; 
     49    int globalRessources = ressourcesManager->getRessourcesSize() ; 
     50    int freeRessources =   ressourcesManager->getFreeRessourcesSize() ; 
     51    if (!nprocs.isEmpty()) nbRessources = nprocs ; 
     52    if (!global_fraction.isEmpty()) nbRessources = std::round(globalRessources * global_fraction) ; 
     53    if (!remain_fraction.isEmpty()) nbRessources = std::round(freeRessources * remain_fraction) ; 
     54    if (!remain.isEmpty()) nbRessources = freeRessources ; 
     55    if (nbRessources>freeRessources) 
     56      ERROR("void CPoolNode::allocateRessources(void)",<<"Cannot allocate required ressources for the pool." 
     57                                                       <<"  Required is : "<<nbRessources<<"  but free ressource is currently : "<<freeRessources) 
     58    string poolId ; 
     59    if (!name.isEmpty()) poolId=name ; 
     60    else if (!hasAutoGeneratedId() ) poolId=getId() ; 
     61    else ERROR("void CPoolNode::allocateRessources(void)",<<"Pool has no name or id, attributes <id> or <name> must be specified") 
     62    ressourcesManager->createPool(poolId, nbRessources) ; 
     63    ressourcesManager->waitPoolRegistration(poolId) ; 
     64    auto services=this->getAllServiceNodes() ; 
     65    for(auto& service : services) service->allocateRessources(poolId) ; 
     66  } 
    2267} 
    2368 
  • XIOS3/dev/XIOS_FILE_SERVICES/src/node/pool_node.hpp

    r2408 r2453  
    1010#include "group_factory.hpp" 
    1111#include "declare_group.hpp" 
     12#include "service_node.hpp" 
    1213 
    1314 
     
    5859    public: 
    5960      virtual void parse(xml::CXMLNode & node); 
     61      void allocateRessources(void) ; 
     62 
     63    private: 
     64      std::vector<CServiceNode*> getAllServiceNodes(void) const {return this->vServiceNodeGroup->getAllChildren();} 
     65      CServiceNodeGroup* getVirtualServiceNodeGroup(void) const {return this->vServiceNodeGroup; } 
     66      void setVirtualServiceNodeGroup(CServiceNodeGroup* newVServiceNodeGroup) { this->vServiceNodeGroup = newVServiceNodeGroup; } 
     67      CServiceNodeGroup* vServiceNodeGroup; 
    6068 
    6169  }; // class CPoolNode 
Note: See TracChangeset for help on using the changeset viewer.